13、RocketMQ进阶:RocketMQ工作原理-订阅关系的一致性

分布式消息队列RocketMQ

3.5)订阅关系的一致性

订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的Topic与Tag及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。

3、 5.1)正确订阅关系;

多个消费者组订阅了多个Topic,并且每个消费者组里的多个消费者实例的订阅关系保持了一致。

*

3、 5.2)错误订阅关系;

一个消费者组订阅了多个Topic,但是该消费者组里的多个Consumer实例的订阅关系并没有保持一致

*

3、 5.2.1)订阅了不同Topic;

该例中的错误在于,同一个消费者组中的两个Consumer实例订阅了不同的Topic。

Consumer实例1-1:(订阅了topic为jodie_test_A,tag为所有的消息)

*Properties properties = new Properties();
 * *properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_1");
 * *Consumer consumer = ONSFactory.createConsumer(properties);
 * *consumer.subscribe("jodie_test_A","*",newMessageListener() {
 * * * *public Action consume (Message message, ConsumeContext context){
 * * * * * *System.out.println(message.getMsgID());
 * * * * * *return Action.CommitMessage;
 * * *  }
 *  });

Consumer实例1-2:(订阅了topic为jodie_test_B,tag为所有的消息)

 *  Properties properties = new Properties();
 *  properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_1");
 *  Consumer consumer = ONSFactory.createConsumer(properties);
 *  consumer.subscribe("jodie_test_B","*",newMessageListener() {
 * * *  public Action consume (Message message, ConsumeContext context){
 * * * * *  System.out.println(message.getMsgID());
 * * * * *  return Action.CommitMessage;
 * * *  }
 *  });

3、 5.2.2)订阅了不同Tag;

该例中的错误在于,同一个消费者组中的两个Consumer订阅了相同Topic的不同Tag。

Consumer实例2-1:(订阅了topic为jodie_test_A,tag为TagA的消息)

 *  Properties properties = new Properties();
 *  properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_2");
 *  Consumer consumer = ONSFactory.createConsumer(properties);
 *  consumer.subscribe("jodie_test_A","TagA",newMessageListener() {
 * * *  public Action consume (Message message, ConsumeContext context){
 * * * * *  System.out.println(message.getMsgID());
 * * * * *  return Action.CommitMessage;
 * * *  }
 *  });

Consumer实例2-2:(订阅了topic为jodie_test_A,tag为所有的消息)

 *  Properties properties = new Properties();
 *  properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_2");
 *  Consumer consumer = ONSFactory.createConsumer(properties);
 *  consumer.subscribe("jodie_test_A","*",newMessageListener() {
 *  public Action consume (Message message, ConsumeContext context){
 * * * * System.out.println(message.getMsgID());
 * * * * return Action.CommitMessage;
 *  });

3、 5.2.3)订阅了不同数量的Topic;

该例中的错误在于,同一个消费者组中的两个Consumer订阅了不同数量的Topic。

Consumer实例3-1:(该Consumer订阅了两个Topic)

 * *Properties properties = new Properties();
 * *properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_3");
 * *Consumer consumer=ONSFactory.createConsumer(properties);
 * *consumer.subscribe("jodie_test_A","TagA",new MessageListener(){
 * * * *public Action consume(Message message,ConsumeContext context){
 * * * * * *System.out.println(message.getMsgID());
 * * * * * *return Action.CommitMessage;
 * * *  }
 *  });
 * * consumer.subscribe("jodie_test_B","TagB",new MessageListener(){
 * * * *public Action consume(Message message,ConsumeContext context){
 * * * * * *System.out.println(message.getMsgID());
 * * * * * *return Action.CommitMessage;
 * * *  }
 *  });
}

Consumer实例3-2:(该Consumer订阅了一个Topic)

 *  Properties properties = new Properties();
 *  properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_3");
 *  Consumer consumer = ONSFactory.createConsumer(properties);
 *  consumer.subscribe("jodie_test_A","TagB",newMessageListener() {
 * * *  public Action consume (Message message, ConsumeContext context){
 * * * * *  System.out.println(message.getMsgID());
 * * * * *  return Action.CommitMessage;
 * * *  }
 *  }); 

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: