19、RocketMQ进阶:RocketMQ应用-顺序消息

分布式消息队列RocketMQ

四、 RocketMQ应用

4.2)顺序消息

4、 2.1)什么是顺序消息;

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。

如果将消息仅发送到同一个 Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

4、 2.2)为什么需要顺序消息;

例如,现在有TOPIC ORDER_STATUS(订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:

订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单 T0000001:发货失败

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

*

这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的;对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

*

基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序 性),能够保证消费的顺序性。

*

4、 2.3)有序性分类;

根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序。

4、 2.3.1)全局有序;

当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序

*

在创建Topic时指定Queue的数量,有三种指定方式:

1、 在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量;
2、 在RocketMQ可视化控制台中手动创建Topic时指定Queue数量;
3、 使用mqadmin命令手动创建Topic时指定Queue数量;

4、 2.3.2)分区有序;

如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。

*

如何实现Queue的选择?在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。

在定义选择器的选择算法时,一般需要使用选择key;这个选择key可以是消息key也可以是其它数据,但无论谁做选择key,都不能重复,都是唯一的。

一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果 即为选择出的Queue的QueueId。

取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?

一般性的作法是,从消息中获取到选择key,对其进行判断;若是当前Consumer需 要消费的消息则直接消费,否则什么也不做;这种做法要求选择key要能够随着消息一起被 Consumer获取到,此时使用消息key作为选择key是比较好的做法。


以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?

同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费同一个Queue的不同选择key的消息的Consumer一定属于不同的Group,而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。

4、 2.4)代码实现案例;

// 顺序消息实现
public class OrderedProducer {
 * *public static void main(String[] args) throws Exception {
 * * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * * *producer.setNamesrvAddr("124.220.161.72:9876");
​
 * * * *// 若为全局有序,则需要设置Queue数量为1
 * * * *// producer.setDefaultTopicQueueNums(1);
 * * * *// 开启生产者
 * * * *producer.start();
 * * * *// 生产并发送100条消息
 * * * *for (int i = 0; i < 100; i++) {
 * * * * * *// 为了演示简单,使用整型数作为orderId
 * * * * * *Integer orderId = i;
 * * * * * *byte[] body = ("Hi," + i).getBytes();
 * * * * * *Message msg = new Message("TopicA", "TagA", body);
 * * * * * *// 将orderId作为消息key
 * * * * * *msg.setKeys(orderId.toString());
 * * * * * *// send()的第三个参数值会传递给选择器的select()的第三个参数
 * * * * * *// 该send()为同步发送,MessageQueueSelector为发送策略选择器
 * * * * * *SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
​
 * * * * * * * *// 具体的选择算法在该方法中定义
 * * * * * * * *@Override
 * * * * * * * *public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
 * * * * * * * * * *// 以下是使用消息key作为选择的选择算法(二选一)
 * * * * * * * * * *String keys = msg.getKeys();
 * * * * * * * * * *Integer id = Integer.valueOf(keys);
​
 * * * * * * * * * *// 以下是使用arg作为选择key的选择算法(二选一)
 * * * * * * * * * *// Integer id = (Integer) arg;
​
 * * * * * * * * * *int index = id % mqs.size();
 * * * * * * * * * *return mqs.get(index);
 * * * * * * *  }
 * * * * *  }, orderId);
​
 * * * * * *System.out.println(sendResult);
 * * *  }
 * * * *producer.shutdown();
 *  }
}

输出:

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: