这篇文章主要讲解了“RocketMQ如何实现消息过滤”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“RocketMQ如何实现消息过滤”吧!
消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤又分为TAG和SQL92模式,分别介绍各自的过滤机制,及代码示例内容,深入探消息过滤的原理。
发送消息时我们会为每一条消息设置TAG标签,同一大类中的消息放在一个主题TOPIC下,但是如果进行分类我们则可以根据TAG进行分类,每一类消费者可能不是关系某个主题下的所有消息,我们就可以通过TAG进行过滤,订阅关注的某一类数据。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = {"TagA","TagB","TagC","TagD"}; for (int i = 0; i < 40; i++) { try { String tag = tags[i % tags.length]; //构建消息 Message msg = new Message("GumxTest" /* Topic */, tag /* Tag */, ("RocketMQ消息测试,消息的TAG="+tag+" == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
主题GumxTest,标签分别是"TagA","TagB","TagC","TagD"每个分别发送10条消息
public class Consumer { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_tags"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("GumxTest", "TagA || TagC"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer1=== 存入时间 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.println("Consumer===启动成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
查看结果:
消费者组订阅相同的主题不同的Tag时,如果订阅是多个Tag则通过“||” 分割
同一个消费者组订阅的主题,Tag必须相同
SQL92表达式消息过滤,是通过消息的属性运行SQL过滤表达式进行条件匹配,消息发送时需要设置用户的属性putUserProperty方法设置属性。
支持的语法:
数值比较, 如>
, >=
, <
, <=
, BETWEEN
, =
;
字符比较, 如=
, <>
, IN
;
IS NULL
or IS NOT NULL
;
逻辑连接符AND
, OR
, NOT
;
支持的类型:
数值型, 如123, 3.1415;
字符型, 如 ‘abc’, 必须用单引号;
NULL
, 特殊常数;
布尔值, TRUE
or FALSE
;
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = {"TagA","TagB","TagC","TagD"}; for (int i = 0; i < 40; i++) { try { String tag = tags[i % tags.length]; //构建消息 Message msg = new Message("GumxTest" /* Topic */, tag /* Tag */, ("RocketMQ消息测试,消息的TAG="+tag+ ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.putUserProperty("age", i+""); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
public class Consumer { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_sql"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("GumxTest", MessageSelector.bySql("age between 0 and 8")); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer=== 存入时间 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.println("Consumer===启动成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
启动时:
启动消费者时发现启动失败,提示不支持SQL92过滤,网上也查了一些资料多说都是说版本太低,但是我使用的是RocketMQ4.2.0版本,已经是很高的版本了。
分析源码发现BrokerConfig的配置类中有一个属性 private boolean enablePropertyFilter = false;默认属性过滤没有开启,然而SQL92就是通过属性来过滤的。问题找到了,我们需要配置broker的属性在broker配置文件中添加enablePropertyFilter =true,需要依次关闭集群中的Broker、NameSrv服务,配置好后依次启动NameSrv、Broker服务
再次启动,启动成功,查看其结果:
RocketMQ通过定义消息过滤类的接口实现消息过滤
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); try { for (int i = 0; i < 6000000; i++) { Message msg = new Message("TopicFilter",// topic "TagA",// tag ("Hello MetaQ age = " + i ).getBytes());// body msg.putUserProperty("age", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupFilter"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); // 使用Java代码,在服务器做消息过滤 String filterCode = MixAll.file2String("D:\\WorkSoft\\workspace\\rocketmq-example\\src\\main\\java\\cn\\gumx\\rocketmq\\filter\\MessageFilterImpl.java"); consumer.subscribe("TopicFilter1", "cn.gumx.rocketmq.filter.MessageFilterImpl",filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext context) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgbody); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
自定义消息的过滤类
public class MessageFilterImpl implements MessageFilter { public boolean match(MessageExt msg, FilterContext arg1) { String property = msg.getUserProperty("age"); if (property != null) { int age = Integer.parseInt(property); if ((age % 3) == 0 && (age > 10)) { return true; } } return false; } }
consumer启动后并没有出现预期的结果,查了资料也没有相关介绍,只是和我们代码一样的处理逻辑,查看源码发现,需要启动filter组件mqfiltersrv服务。
./mqfiltersrv -n 10.10.12.203:9876;10.10.12.204:9876 &
我们部署的是双主,mqfiltersrv服务都需要开启
查看服务端结果:
使用类消息过滤模式,需要额外需要启动filter组件mqfiltersrv服务,否则消费不了,每个broker都需要启动一个,相当于加了一层过滤层。
filtersrv 出现了。减少了 Broker 的负担,又减少了 Consumer 接收无用的消息。当然缺点也是有的,多了一层 filtersrv 网络开销
MessageFilterImpl消息过滤实现类中的代码最好不要带有中文防止错误
注意:RocketMQ4.3.1开始删除与 mqfilter 服务器相关的脚本,4.3.2删除客户端关于mqfilter 客户端代码,后面版本不支持该功能。
感谢各位的阅读,以上就是“RocketMQ如何实现消息过滤”的内容了,经过本文的学习后,相信大家对RocketMQ如何实现消息过滤这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。