11、RabbitMQ基础:延迟队列

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
    1. 过期消息实现延迟队列
    1. 过期队列实现延迟队列
    1. 插件实现延迟队列
    • 3.1 安装插件
    • 3.2 代码实现
    • 3.3 测试

前言

延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求场景: 用户下单后,30分钟未支付,取消订单,回滚库存。

本文介绍了三种方式实现,前两种存在一定的局限性

1. 过期消息实现延迟队列

发送带有TTL过期属性的消息,到达过期时间后,投递到死信队列,实现延迟队列功能。

添加一个死信交换机、死信队列:

@Configuration
public class RabbitMqDeadQueueConfig {
   
     

    private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 死信队列
     */
    @Bean(DEAD_QUEUE)
    public Queue deadQueue() {
   
     
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 死信交换机
     */
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange() {
   
     
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 创建死信队列和死信交换机的绑定关系
     */
    @Bean("deadBinding")
    public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
   
     
        return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
    }
}

添加一个延迟交换机、延迟队列:

@Configuration
public class RabbitMqDelayQueueConfig {
   
     
    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 使用 ExchangeBuilder 创建交换机
     */
    @Bean("delayExchange")
    public Exchange bootExchange() {
   
     
        return ExchangeBuilder.directExchange("delayExchange").durable(true).build();
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue bootQueue001() {
   
     
        return QueueBuilder.durable("delayQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
   
     
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").and(null);
    }
}

创建一个消费者,接收死信消息:

@Component
public class RabbitConsumer {
   
     

    @RabbitListener(queues = {
   
     "deadQueue"})
    public void deadQueue(Message message) {
   
     
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }

}

发送TTL订单消息:

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000"); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message);

运行程序,可以看到,过了10秒,收到了订单信息~
*

但是该方式存在一个致命缺陷,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。 每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列。RabbitMQ是等消息到达队列顶部即将被消费时,才会判断其是否过期并删除。所以即使消息过期,也不会马上从队列中抹去。

首先发送一个过期时间为20秒的消息,再发送一个过期时间为10秒的消息:

        // 发送过期时间为20秒的消息,先到达队列
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送过期时间为10秒的消息,后达队列
        messageProperties.setExpiration("10000"); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);

测试发现,第二条过期时间为10秒的消息,虽然过期时间更短,但也需要等到第一条过期后,到达消息顶部,才会被扫描是否过期,由此可见, 过期消息实现延迟队列并不可取~~~
*

2. 过期队列实现延迟队列

实现思路:

1、 用户下单后,生成订单,发送订单消息到延迟队列中,并设置过期时间为30分钟,该队列没有消费者;
2、 订单队列消息过期后,发送订单至死信队列;
3、 死信队列消费者接收到消息后,判断订单状态,进行后续操作;
*

给整个队列添加过期时间实现延迟队列。由于过期时间作用于整个队列,所以不是很灵活,比如设置30分钟需要一个队列,设置10分钟时,又需要创建一个队列。

创建一个过期时间队列:

    @Bean("delayQueue")
    public Queue delayQueue() {
   
     
        return QueueBuilder.durable("delayQueue").withArgument("x-message-ttl", 10000).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }

参照上面的案例发送消息即可。

3. 插件实现延迟队列

上面我们讨论了两种实现延迟队列的方式,但是都存在一些问题,官网也提供了基于插件的方式来实现。

消息到达延迟交换机后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。

*

3.1 安装插件

官网中提供了很多插件,以满足更多的功能需求。

GitJHub中下载延迟插件:

*
将其放在RabbitMQ程序主目录的plugins下:

*
切换到sbin目录下,运行安装插件命令:

rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

成功安装提示:

*

重启RabbitMQ,进入到交换机页面,添加交换机,可以看到一个新的类型为x-delayed-message,说明插件安装成功:
*

3.2 代码实现

首先创建x-delayed-message类型的交换机,并绑定队列:

@Configuration
public class RabbitMqDelayQueueConfig {
   
     

    @Bean("delayExchange")
    public CustomExchange delayExchange() {
   
     
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue delayQueue() {
   
     
        return QueueBuilder.durable("delayQueue").build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
   
     
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
    }
}

创建消费者,监听延迟队列:

    @RabbitListener(queues = {
   
     "delayQueue"})
    public void deadQueue(Message message) {
   
     
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }
}

创建生产者,发送不同延迟时间的消息:

        // 发送延迟时间为20秒的消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(20000); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送延迟时间为10秒的消息
        messageProperties.setDelay(10000); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);

3.3 测试

可以看到第一条消息和第二条消息,都是在各自指定的延迟时间后被消费,并没有出现时序问题,而且每个消息都具有不同的延迟时间,灵活性很高。

所以插件是实际应用中常用的一种方式,要实现延迟队列功能,当前其他MQ,甚至Redis也可以~~~
*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: