04、RabbitMQ基础:六大工作模式

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
  • 简单模式
      1. 添加依赖
      1. 发送消息
      1. 接收消息
      1. 测试
  • 工作队列模式
    • 案例演示
  • 发布/订阅模式
  • 路由模式
  • 主题模式
  • RPC 模式

前言

工作模式指的是消息发送及接受的策略。

RabbitMQ 支持六种模式,官网图示:
*

简单模式

接下里我们使用RabbitMQ JAVA 客户端,实现一个简单的发送/接收消息案例。
*
RabbitMQ Java Client GitHub 地址

1. 添加依赖

客户端5.X 版本,需要JDK 1.8+、RabbitMQ 服务端3.X 支持。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

2. 发送消息

首先需要创建连接工厂,配置服务端地址信息:

        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");

客户端和服务端进行通信时,会使用Connection建立 TCP 连接。然后每一次通信时,会在Connection内部建立的逻辑连接Channel,这样可以大量减少TCP 连接开销。
*
所以接下来需要创建ConnectionChannel

        // 2. 新建连接
        Connection connection = factory.newConnection();
        // 3. 新建通道
        Channel channel = connection.createChannel();

接着声明队列:

channel.queueDeclare("hello", false, false, false, null);

发送消息:

channel.basicPublish("", "hello", null, "hello world".getBytes());

完整代码如下:

public class Producer {
   
     

    public static void main(String[] args) throws IOException, TimeoutException {
   
     
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 新建连接
        Connection connection = factory.newConnection();
        // 3. 新建通道
        Channel channel = connection.createChannel();
        /*
         * 4. 声明队列,参数:
         *  1.队列名称
         *  2.队列里面的消息是否持久化 默认消息存储在内存中
         *  3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
         *  4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
         *  5.其他参数
         */
        channel.queueDeclare("hello", false, false, false, null);
        /*
         * 5. 发送消息,参数:
         *  1.发送到那个交换机,(为空表示使用默认交换机)
         *  2.路由的 key ,默认交换隐式绑定到每个队列,路由KEY等于队列名称。无法显式绑定到默认交换或从默认交换解除绑定。它也不能被删除。
         *  3.其他的参数信息
         *  4.发送消息的消息体
         */
        channel.basicPublish("", "hello", null, "hello world".getBytes());
        System.out.println("消息发送完毕");
        // 6. 关闭连接
        channel.close();
        connection.close();
    }
}

3. 接收消息

代码如下:

public class Consumer {
   
     
    public static void main(String[] args) throws IOException, TimeoutException {
   
     
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 创建连接及通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();   //与rabbitmq通讯的通道
        // 3. 绑定队列
        channel.queueDeclare("hello", false, false, false, null);
        // 4. 处理消息的回调对象
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        {
   
     
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        // 4. 取消消息处理的回调对象
        CancelCallback cancelCallback = s -> {
   
     
            System.out.println("消息消费被中断");
        };
        // 5. 接收消息,把消息传递到回调对象处理,参数:
        /*  1.消费哪个队列
            2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
            3.消费者未成功消费的回调
         */
        channel.basicConsume("hello", true, deliverCallback, cancelCallback);
    }
}

4. 测试

运行Consumer,再运行Producer,成功接收到消息:
*

工作队列模式

Work Queues是官方中提出的第二种工作模式,一个生产者发送消息,有多个消费者来监听任务,但是只有一个消费者能收到消息:
*

工作队列/任务队列的主要思想是把任务封装为消息并将其发送到队列。在后台运行的工作进程将接收到任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。从而能实现异步多线程任务。

这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中不可能处理复杂的任务。

案例演示

复制一份Consumer类,分别启动:
*
Producer类中,分别发送aaa、bbb、ccc、ddd四个消息:
*
第一个消费者,收到了aaa、ccc:
*
第二个消费者,收到了bbb、ddd:
*
总结:在该模式下,一个消息只能被一个线程消费,采用的是轮询接收。

发布/订阅模式

发布/订阅模式中,消息的发送者通过消息通道广播出去,让订阅改消息主题的订阅者消费。

就是一个生产者发送的消息会被多个消费者获取,所以也叫广播模式、一对多模式。
*
流程示意图如下

1、 生产者将消息发送到交换机;
2、 交换机将信息发给所有绑定的队列;
3、 绑定队列的消费者收到消息;

该模式需要指定一个Exchange交换机,起本身只负责转发消息,不具备存储消息的能力。一方面,接收生产者发送的消息。另一方面,需要知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。

到底如何操作,取决于Exchange的类型。有常见以下几种类型:

  • Fanout:广播。将消息交给所有绑定到交换机的队列
  • Direct:定向。把消息交给符合指定routing key的队列
  • Topic:通配符。把消息交给符合routing pattern(路由模式)的队列
  • Header:通过消息内容中的headers属性来进行匹配

按照流程,编写生产者代码:

public class Producer {
   
     

    public static void main(String[] args) throws IOException, TimeoutException {
   
     
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 新建连接
        Connection connection = factory.newConnection();
        // 3. 新建通道
        Channel channel = connection.createChannel();
        /*
         * 4. 声明一个交换机
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean , Map<String, Object> arguments)
         *  参数:
         *  exchange=》交换机名称
         *  type=》指定交换机的类型为FANOUT广播模式
         *  durable=》是否持久化
         *  autoDelete=》自动删除
         *  internal=》内部使用,一般为false
         *  arguments=》参数
         */
        String exchangeName = "fanoutExchange";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
        // 5. 创建队列
        String fanoutQueueOne = "fanoutQueueOne";
        String fanoutQueueTwo = "fanoutQueueTwo";
        channel.queueDeclare(fanoutQueueOne, true, false, false, null);
        channel.queueDeclare(fanoutQueueTwo, true, false, false, null);
        // 6. 绑定队列和交换机,参数(队列名称、交换机名称,路由键(绑定规则,如果交换机的类型为fanout,routingKey为“”)
        channel.queueBind(fanoutQueueOne, exchangeName, "");
        channel.queueBind(fanoutQueueTwo, exchangeName, "");
        // 7. 发送消息
        String body = "广播消息";
        channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));
        // 8. 关闭连接
        channel.close();
        connection.close();
        System.out.println("消息发送完毕");
    }
}

运行程序,发送消息,在控制台中,可以看到创建的交换机、绑定关系如下图:
*
在创建的两个队列中,可以看到都收到了消息,并处于就绪状态:
*

创建消费者,并复制代码,运行两个消费者:

public class FanoutConsumerOne {
   
     
    public static void main(String[] args) throws IOException, TimeoutException {
   
     
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 创建连接及通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();   //与rabbitmq通讯的通道
        // 3. 绑定队列,队列和交换机绑定
        String fanoutQueueOne = "fanoutQueueOne";
        String exchangeName = "fanoutExchange";
        channel.queueBind(fanoutQueueOne, exchangeName, "");
        // 4. 处理消息的回调对象
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        {
   
     
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        // 4. 取消消息处理的回调对象
        CancelCallback cancelCallback = s -> {
   
     
            System.out.println("消息消费被中断");
        };
        // 5. 接收消息,把消息传递到回调对象处理
        channel.basicConsume(fanoutQueueOne, true, deliverCallback, cancelCallback);
    }
}

可以看到,每个消息都会被所有消费者接收:
*

*

路由模式

官网的路由(Routing)模式流程示意图如下:
*
该模式也需要加入交换机,指定其类型为direct。队列在绑定交换机时要指定routing key(路由键),消息会转发到符合routing key的队列。交换机根据routingKey进行完全匹配,如果匹配失败则丢弃消息。

流程说明

1、 生产者P,向Exchange发送消息,发送消息时,会指定一个routingkey
2、 交换机接收生产者的消息,然后把消息递交给与routingkey完全匹配的队列;
3、 消费者C1,其所在队列指定了需要routingkey为error的消息;
4、 消费者C2,其所在队列指定了需要routingkey为info、error、warning的消息;

生产者核心代码

       // 4. 声明一个交换机
        String exchangeName = "directExchange";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 5. 创建队列
        String directQueueInfo = "directQueueInfo";
        String directQueueError = "directQueueError";
        channel.queueDeclare(directQueueInfo, true, false, false, null);
        channel.queueDeclare(directQueueError, true, false, false, null);
        // 6. 绑定队列和交换机,参数(队列名称、交换机名称,路由键)
        channel.queueBind(directQueueInfo, exchangeName, "info");
        channel.queueBind(directQueueError, exchangeName, "error");
        // 7. 发送消息
        channel.basicPublish(exchangeName, "info", null, "INFO日志".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "error", null, "ERROR日志".getBytes(StandardCharsets.UTF_8));

消费者代码

        // 3. 绑定队列
        String directQueueError = "directQueueError";
        // 队列和交换机绑定
        String exchangeName = "directExchange";
        channel.queueBind(directQueueError, exchangeName, "error");
        // 4. 处理消息的回调对象
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        {
   
     
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        // 4. 取消消息处理的回调对象
        CancelCallback cancelCallback = s -> {
   
     
            System.out.println("消息消费被中断");
        };
        // 5. 接收消息,把消息传递到回调对象处理,参数:
        channel.basicConsume(directQueueError, true, deliverCallback, cancelCallback);

再复制代码创建另外一个消费者,绑定路由键为info,分别启动后运行发送消息。

结果如下所示,绑定了error路由的队列只收到ERROR日志,info路由的队列只收到INFO日志。
**

主题模式

*

主题Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的Routing key 可以使用通配符

Routingkey一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:order.insert

通配符规则

  • #:匹配一个或多个词
  • *:匹配一个词

举例

  • item.#:能够匹配item.spu.insert 或者item.spu
  • item.*:只能匹配item.spu

创建Topic类型的交换机,发送消息:

        // 4. 声明一个交换机类型为TOPIC
        String exchangeName = "topicExchange";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
        // 5. 发送消息时,携带通配符KEY
        channel.basicPublish(exchangeName, "aa.error", null, "日志=》aa.error".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "bbb.ccc.error", null, "日志=>bbb.ccc.error".getBytes(StandardCharsets.UTF_8));

创建两个消费者,使用通配符绑定:

        // 3. 绑定队列
        String topicQueueError = "topicQueueError";
        // 队列和交换机绑定,*.error 只能接到aa.error、bb.error等前缀为一个单词的消息
        String exchangeName = "topicExchange";
        channel.queueBind(topicQueueError, exchangeName, "*.error");

        // 3. 绑定队列
        String topicQueueError = "topicQueueError";
        // 队列和交换机绑定,#.error 只能接到aa.error、bb.cc.error等前缀为一个或多个单词的消息
        String exchangeName = "topicExchange";
        channel.queueBind(topicQueueError, exchangeName, "#.error");

运行代码,发送消息,运行结果如下:
*

*

RPC 模式

RPC 模式也就是使用队列实现远程过程调用。实际使用的很少,专业的RPC框架已经很多了~,这里就不介绍了。
*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: