10、RabbitMQ基础:死信队列

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 概念
  • 创建死信交换机、队列
  • 过期导致死信
  • 拒接消费
  • 长度限制

概念

无法被消费的消息被称为死信,存放死信的队列也就是死信队列

由于某些特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信

例如消费消息时,发生异常,经过一定次数的重试后,该消息依然无法被正常消费,此时可以将该消息放入死信队列中,后续进行人工干预。

消息成为死信的三种情况:

1、 队列消息长度到达限制;
2、 消费者异常拒接消费消息;
3、 原队列存在消息过期设置,消息到达超时时间未被消费;

一个简单的死信处理流程图如下:
*
流程图说明:

1、 生产者投递消息,消费者监听队列消息;
2、 产生死信消息时,投递到死信队列;
3、 死信消费者消费死信消息,存入到数据库,并进行人工干预处理;

创建死信交换机、队列

死信交换机、死信队列需要我们自己创建,只是业务中用来存放死信的“特殊”交换机队列。其他队列可以指定死信交换机、死信队列,当发生死信时,自动将其投递到死信队列中。

创建死信交换机、死信队列

@Configuration
public class RabbitMqDeadQueueConfig {
   
     

    private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 死信队列
     */
    @Bean(DEAD_QUEUE)
    public Queue deadQueue() {
   
     
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 死信交换机
     */
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange() {
   
     
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 创建死信队列和死信交换机的绑定关系
     */
    @Bean("deadBinding")
    public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
   
     
        return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
    }
}

创建正常的业务消费队列,并指定指定死信交换机、路由KEY

@Configuration
public class RabbitMqConfig {
   
     

    private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 使用 ExchangeBuilder 创建交换机
     */
    @Bean("bootExchange")
    public Exchange bootExchange() {
   
     
        return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("bootQueue")
    public Queue bootQueue001() {
   
     
        return QueueBuilder.durable("bootQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }
}

创建一个死信消费者,消费死信,存入到数据库,等待人工干预处理:

    @RabbitListener(queues = {
   
     "deadQueue"})
    public void receiveMessage001(Message message) {
   
     
        System.out.println("收到死信消息" + new String(message.getBody()));
        System.out.println("存入数据库,等待人工干预");
    }

过期导致死信

首先模拟过期导致死信,注释正常业务消费者代码,发送一条TTL 消息:

        // 1. 消息过期
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message("HELLO TTL".getBytes(), messageProperties);
        rabbitTemplate.send("bootExchange", "boot.key", message);

到了过期时间后,由于没有消费者消费该消息,成为死信,最终被死信消费者接收到:

*

拒接消费

在之前,我们介绍过消息确认ACK机制,当拒收消息时,也可以放入死信队列中。

参考上面的文档,开启手动确认模式,可以看到拒接消息后,也会存放到死信队列中:
*

长度限制

创建队列,指定长度为1,当队列中的消息已经达到了这个最大长度限制时,再次投递,消息将被挤掉,被挤掉的会进入死信队列。

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY、数量限制
     */
    @Bean("bootQueue")
    public Queue bootQueue() {
   
     
        return QueueBuilder.durable("bootQueue").maxLength(1).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }

批量发送一百条消息:

        Message message = new Message("HELLO WORLD".getBytes(StandardCharsets.UTF_8));
        for (int i = 0; i < 100; i++) {
   
     
            rabbitTemplate.send("bootExchange", "boot.key", message);
        }

关闭业务监听消费者,运行发送消息,可以看到大量消息进入死信队列:
*
查看控制台,只有第一条消息被保存在队列中:
*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: