分布式消息队列RocketMQ
3.5)订阅关系的一致性
订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的Topic与Tag及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。
3、 5.1)正确订阅关系;
多个消费者组订阅了多个Topic,并且每个消费者组里的多个消费者实例的订阅关系保持了一致。
3、 5.2)错误订阅关系;
一个消费者组订阅了多个Topic,但是该消费者组里的多个Consumer实例的订阅关系并没有保持一致
3、 5.2.1)订阅了不同Topic;
该例中的错误在于,同一个消费者组中的两个Consumer实例订阅了不同的Topic。
Consumer实例1-1:(订阅了topic为jodie_test_A,tag为所有的消息)
*Properties properties = new Properties();
* *properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_1");
* *Consumer consumer = ONSFactory.createConsumer(properties);
* *consumer.subscribe("jodie_test_A","*",newMessageListener() {
* * * *public Action consume (Message message, ConsumeContext context){
* * * * * *System.out.println(message.getMsgID());
* * * * * *return Action.CommitMessage;
* * * }
* });
Consumer实例1-2:(订阅了topic为jodie_test_B,tag为所有的消息)
* Properties properties = new Properties();
* properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_1");
* Consumer consumer = ONSFactory.createConsumer(properties);
* consumer.subscribe("jodie_test_B","*",newMessageListener() {
* * * public Action consume (Message message, ConsumeContext context){
* * * * * System.out.println(message.getMsgID());
* * * * * return Action.CommitMessage;
* * * }
* });
3、 5.2.2)订阅了不同Tag;
该例中的错误在于,同一个消费者组中的两个Consumer订阅了相同Topic的不同Tag。
Consumer实例2-1:(订阅了topic为jodie_test_A,tag为TagA的消息)
* Properties properties = new Properties();
* properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_2");
* Consumer consumer = ONSFactory.createConsumer(properties);
* consumer.subscribe("jodie_test_A","TagA",newMessageListener() {
* * * public Action consume (Message message, ConsumeContext context){
* * * * * System.out.println(message.getMsgID());
* * * * * return Action.CommitMessage;
* * * }
* });
Consumer实例2-2:(订阅了topic为jodie_test_A,tag为所有的消息)
* Properties properties = new Properties();
* properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_2");
* Consumer consumer = ONSFactory.createConsumer(properties);
* consumer.subscribe("jodie_test_A","*",newMessageListener() {
* public Action consume (Message message, ConsumeContext context){
* * * * System.out.println(message.getMsgID());
* * * * return Action.CommitMessage;
* });
3、 5.2.3)订阅了不同数量的Topic;
该例中的错误在于,同一个消费者组中的两个Consumer订阅了不同数量的Topic。
Consumer实例3-1:(该Consumer订阅了两个Topic)
* *Properties properties = new Properties();
* *properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_3");
* *Consumer consumer=ONSFactory.createConsumer(properties);
* *consumer.subscribe("jodie_test_A","TagA",new MessageListener(){
* * * *public Action consume(Message message,ConsumeContext context){
* * * * * *System.out.println(message.getMsgID());
* * * * * *return Action.CommitMessage;
* * * }
* });
* * consumer.subscribe("jodie_test_B","TagB",new MessageListener(){
* * * *public Action consume(Message message,ConsumeContext context){
* * * * * *System.out.println(message.getMsgID());
* * * * * *return Action.CommitMessage;
* * * }
* });
}
Consumer实例3-2:(该Consumer订阅了一个Topic)
* Properties properties = new Properties();
* properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_3");
* Consumer consumer = ONSFactory.createConsumer(properties);
* consumer.subscribe("jodie_test_A","TagB",newMessageListener() {
* * * public Action consume (Message message, ConsumeContext context){
* * * * * System.out.println(message.getMsgID());
* * * * * return Action.CommitMessage;
* * * }
* });
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: