22、RocketMQ进阶:RocketMQ应用-批量消费消息

分布式消息队列RocketMQ

四、 RocketMQ应用

4.5)批量消息

4、 5.1)批量发送消息;

4、 5.1.1)发送限制;

生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率,不过需要注意以下几点:

1、 批量发送的消息必须具有相同的Topic;
2、 批量发送的消息必须具有相同的刷盘策略;
3、 批量发送的消息不能是延时消息与事务消息;

4、 5.1.2)批量发送大小;

默认情况下,一批发送的消息总大小不能超过4MB字节;如果想超出该值,有两种解决方案:

1、 将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送;
2、 在Producer端与Broker端修改属性;

Producer端需要在发送之前设置Producer的maxMessageSize属性

Broker端需要修改其加载的配置文件中的maxMessageSize属性

4、 5.1.3)生产者发送的消息大小;

Producer发送消息Message的结构如下图:

*

生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的;

这个字符串由四部分构成:Topic、消息Body、消息日志 (占20字节)及用于描述消息的一堆属性key-value;这些属性中包含例如生产者地址、生产时间、 要发送的QueueId等;

最终写入到Broker中消息单元中的数据都是来自于这些属性。

4、 5.2)批量消费消息;

4、 5.2.1)修改批量属性;

*

Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息,若要使其一次可以消费多条消息,则可以通过修改 Consumer的consumeMessageBatchMaxSize属性来指定,不过,该值不能超过32。

因为默认情况下消费者每次可以拉取的消息多是32条,若要修改一次拉取的大值,则可通过修改Consumer的 pullBatchSize属性来指定。

4、 5.2.2)存在的问题;

Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好?

——》当然不是

pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高;若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。

consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果;因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。

4、 5.3)代码实现案例;

该批量发送的需求:不修改大发送4M的默认值,但要防止发送的批量消息超出4M的限制

4、 5.3.1)定义消息列表分割器;

// 消息列表分割器:其只会处理每条消息的大小不超4M的情况【将一个消息集合中的消息分割为多个消息列表(不超4M),但对某一条超过4M的消息无法处理】
// 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
// 其直接将这条消息构成一个子列表返回。并没有再进行分割
public class MessageListSplitter implements Iterator<List<Message>> {
 * *// 指定极限值为4M
 * *private final int SIZE_LIMIT = 4 * 1024 * 1024;
 * *// 存放所有要发送的消息
 * *private final List<Message> messages;
 * *// 要进行批量发送消息的小集合起始索引
 * *private int currIndex;
 * *public MessageListSplitter(List<Message> messages) {
 * * * *this.messages = messages;
 *  }
 * *@Override
 * *public boolean hasNext() {
 * * * *// 判断当前开始遍历的消息索引要小于消息总数
 * * * *return currIndex < messages.size();
 *  }
 * *@Override
 * *public List<Message> next() {
 * * * *int nextIndex = currIndex;
 * * * *// 记录当前要发送的这一小批次消息列表的大小
 * * * *int totalSize = 0;
 * * * *for (; nextIndex < messages.size(); nextIndex++) {
 * * * * * *// 获取当前遍历的消息
 * * * * * *Message message = messages.get(nextIndex);
 * * * * * *// 统计当前遍历的message的大小
 * * * * * *int tmpSize = message.getTopic().length() + message.getBody().length;
 * * * * * *Map<String, String> properties = message.getProperties();
 * * * * * *for (Map.Entry<String, String> entry : properties.entrySet()) {
 * * * * * * * *tmpSize += entry.getKey().length() + entry.getValue().length();
 * * * * *  }
 * * * * * *// Producer发送消息Message的结构:Topic + Body + Log(固定20字节) + Properties
 * * * * * *tmpSize = tmpSize + 20;
​
 * * * * * *// 判断当前消息本身是否大于4M
 * * * * * *if (tmpSize > SIZE_LIMIT) {
 * * * * * * * *if (nextIndex - currIndex == 0) {
 * * * * * * * * * *nextIndex++;
 * * * * * * *  }
 * * * * * * * *break;
 * * * * *  }
 * * * * * *// 当前消息的大小 + 之前统计要发送这一小批次消息列表的大小 》极限值4M
 * * * * * *if (tmpSize + totalSize > SIZE_LIMIT) {
 * * * * * * * *break;
 * * * * *  } else {
 * * * * * * * *// 统计要发送这一小批次消息列表的大小
 * * * * * * * *totalSize += tmpSize;
 * * * * *  }
​
 * * *  } // end-for
​
 * * * *// 获取当前messages列表的子集合[currIndex, nextIndex)
 * * * *List<Message> subList = messages.subList(currIndex, nextIndex);
 * * * *// 下次遍历的开始索引
 * * * *currIndex = nextIndex;
 * * * *return subList;
 *  }
}

4、 5.3.2)定义批量消息生产者;

// 定义批量消息生产者
public class BatchProducer {
 *  public static void main(String[] args) throws Exception {
 * * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * *  producer.setNamesrvAddr("rocketmqOS:9876");
 * * * *// 指定要发送的消息的最大大小,默认是4M
 * * * *// 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
 * * * *// maxMessageSize属性
 * * * *// producer.setMaxMessageSize(8 * 1024 * 1024);
 * * *  producer.start();
​
 * * * *// 定义要发送的消息集合
 * * *  List<Message> messages = new ArrayList<>();
 * * *  for (int i = 0; i < 100; i++) {
 * * * * * *byte[] body = ("Hi," + i).getBytes();
 * * * * *  Message msg = new Message("TopicD", "someTag", body);
 * * * * *  messages.add(msg);
 * * * *}
​
 * * * *// 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
 * * *  MessageListSplitter splitter = new MessageListSplitter(messages);
 * * *  while (splitter.hasNext()) {
 * * * * * *try {
 * * * * * * * *List<Message> listItem = splitter.next();
 * * * * * * *  producer.send(listItem);
 * * * * * *} catch (Exception e) {
 * * * * * * * *e.printStackTrace();
 * * * * * *}
 * * * *}
 * * *  producer.shutdown();
 * *}
}

输出:

*

4、 5.3.3)定义批量消息消费者;

// 定义批量消息消费者
public class BatchConsumer {
 * *public static void main(String[] args) throws MQClientException {
 * * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
 * * * *consumer.setNamesrvAddr("rocketmqOS:9876");
 * * * *consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 * * * *consumer.subscribe("someTopicA", "*");
 * * * *// 指定每次可以消费10条消息,默认为1
 * * * *consumer.setConsumeMessageBatchMaxSize(10);
 * * * *// 指定每次可以从Broker拉取40条消息,默认为32
 * * * *consumer.setPullBatchSize(40);
 * * * *consumer.registerMessageListener(new MessageListenerConcurrently() {
 * * * * * *@Override
 * * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 * * * * * * * *for (MessageExt msg : msgs) {
 * * * * * * * * * *System.out.println(msg);
 * * * * * * *  }
 * * * * * * * *// 消费成功的返回结果
 * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * * * * *// 消费异常时的返回结果
 * * * * * * * *// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 * * * * *  }
 * * *  });
 * * * *consumer.start();
 * * * *System.out.println("Consumer Started");
 *  }
}

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: