23、RocketMQ进阶:RocketMQ应用-消息过滤

分布式消息队列RocketMQ

四、 RocketMQ应用

4.6)消息过滤

消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。

对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。

4、 6.1)Tag过滤;

通过consumer的subscribe()方法指定要订阅消息的Tag。

如果订阅多个Tag的消息,Tag间使用或运算 符(双竖线||)连接,如下:

DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

4、 6.2)SQL过滤;

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。

通过SQL过滤, 可以实现对消息的复杂过滤;不过只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符。

4、 6.2.1)支持的常量类型;

数值:比如:123,3.1415

字符:必须用单引号包裹起来,比如:'abc'

布尔:TRUE 或 FALSE

NULL:特殊的常量,表示空

4、 6.2.2)支持的运算符;

数值比较:>,>=,<,<=、BETWEEN,=

字符比较:=,<>、IN

逻辑运算 : AND、OR、NOT

NULL判断:IS NULL 或者 IS NOT NULL

4、 6.2.3)开启消息的SQL过滤功能;

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功

能:

enablePropertyFilter = true

在启动Broker时需要指定这个修改过的配置文件;例如对于单机Broker的启动,其修改的配置文件是 conf/broker.conf,启动时使用如下命令:

sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

4、 6.3)代码实现案例;

4、 6.3.1)定义Tag过滤生产者;

// 定义Tag过滤生产者
public class FilterByTagProducer {
 * *public static void main(String[] args) throws Exception {
 * * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * * *producer.setNamesrvAddr("rocketmqOS:9876");
 * * * *// 开启生产者
 * * * *producer.start();
 * * * *// 发送的消息均包含Tag,为以下三种Tag之一
 * * * *String[] tags = {"myTagA", "myTagB", "myTagC"};
 * * * *// 生产并发送10条消息
 * * * *for (int i = 0; i < 10; i++) {
 * * * * * *byte[] body = ("Hi," + i).getBytes();
 * * * * * *String tag = tags[i % tags.length];
 * * * * * *Message msg = new Message("TopicC", tag, body);
 * * * * * *SendResult sendResult = producer.send(msg);
 * * * * * *System.out.println(sendResult);
 * * *  }
 * * * *producer.shutdown();
 *  }
}

输出:

*

4、 6.3.2)定义Tag过滤消费者;

 // 定义Tag过滤消费者
public class FilterByTagConsumer {
 * *public static void main(String[] args) throws Exception {
 * * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
 * * * *consumer.setNamesrvAddr("rocketmqOS:9876");
 * * * *consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
​
 * * * *// 仅订阅Tag为myTagA与myTagB的消息,不包含myTagC
 * * * *consumer.subscribe("myTagC", "myTagA || myTagB");
 * * * *consumer.registerMessageListener(new MessageListenerConcurrently() {
 * * * * * *@Override
 * * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 * * * * * * * *for (MessageExt me:msgs){
 * * * * * * * * * *System.out.println(me);
 * * * * * * *  }
 * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * *  }
 * * *  });
 * * * *consumer.start();
 * * * *System.out.println("Consumer Started");
 *  }
}

输出:【过滤了myTagC的消息】

*

4、 6.3.3)定义SQL过滤生产者;

public class FilterBySQLProducer {
 * *public static void main(String[] args) throws Exception {
 * * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * * *producer.setNamesrvAddr("rocketmqOS:9876");
 * * * *// 开启生产者
 * * * *producer.start();
 * * * *// 生产并发送10条消息
 * * * *for (int i = 0; i < 10; i++) {
 * * * * * *try {
 * * * * * * * *byte[] body = ("Hi," + i).getBytes();
 * * * * * * * *Message msg = new Message("TopicE", "myTag", body);
 * * * * * * * *// 事先埋入用户属性age
 * * * * * * * *msg.putUserProperty("age", i + "");
 * * * * * * * *SendResult sendResult = producer.send(msg);
 * * * * * * * *System.out.println(sendResult);
 * * * * *  } catch (Exception e) {
 * * * * * * * *e.printStackTrace();
 * * * * *  }
 * * *  }
 * * * *producer.shutdown();
 *  }
}

输出:

*

4、 6.3.4)定义SQL过滤消费者;

public class FilterBySQLConsumer {
 * *public static void main(String[] args) throws Exception {
 * * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
 * * * *consumer.setNamesrvAddr("rocketmqOS:9876");
 * * * *consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 * * * *// 要从TopicE的消息中过滤出age在[0, 6]间的消息
 * * * *consumer.subscribe("TopicE", MessageSelector.bySql("age between 0 and 6"));
 * * * *consumer.registerMessageListener(new MessageListenerConcurrently() {
 * * * * * *@Override
 * * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 * * * * * * * *for (MessageExt me:msgs){
 * * * * * * * * * *System.out.println(me);
 * * * * * * *  }
 * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * *  }
 * * *  });
 * * * *consumer.start();
 * * * *System.out.println("Consumer Started");
 *  }
}

输出:【过滤出age在[0, 6]之间的消息】

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: