有道无术,术尚可求,有术无道,止于术。
文章目录
-
- 前言
- 简单模式
-
-
- 添加依赖
-
- 发送消息
-
- 接收消息
-
- 测试
-
- 工作队列模式
-
- 案例演示
- 发布/订阅模式
- 路由模式
- 主题模式
- RPC 模式
前言
工作模式
指的是消息发送及接受的策略。
RabbitMQ
支持六种模式,官网图示:
简单模式
接下里我们使用RabbitMQ
JAVA 客户端,实现一个简单的发送/接收消息案例。
RabbitMQ Java Client GitHub 地址
1. 添加依赖
客户端5.X 版本,需要JDK 1.8+、RabbitMQ 服务端3.X 支持。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
2. 发送消息
首先需要创建连接工厂,配置服务端地址信息:
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
客户端和服务端进行通信时,会使用Connection
建立 TCP 连接。然后每一次通信时,会在Connection
内部建立的逻辑连接Channel
,这样可以大量减少TCP 连接开销。
所以接下来需要创建Connection
和Channel
:
// 2. 新建连接
Connection connection = factory.newConnection();
// 3. 新建通道
Channel channel = connection.createChannel();
接着声明队列:
channel.queueDeclare("hello", false, false, false, null);
发送消息:
channel.basicPublish("", "hello", null, "hello world".getBytes());
完整代码如下:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 新建连接
Connection connection = factory.newConnection();
// 3. 新建通道
Channel channel = connection.createChannel();
/*
* 4. 声明队列,参数:
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare("hello", false, false, false, null);
/*
* 5. 发送消息,参数:
* 1.发送到那个交换机,(为空表示使用默认交换机)
* 2.路由的 key ,默认交换隐式绑定到每个队列,路由KEY等于队列名称。无法显式绑定到默认交换或从默认交换解除绑定。它也不能被删除。
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", "hello", null, "hello world".getBytes());
System.out.println("消息发送完毕");
// 6. 关闭连接
channel.close();
connection.close();
}
}
3. 接收消息
代码如下:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 创建连接及通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
// 3. 绑定队列
channel.queueDeclare("hello", false, false, false, null);
// 4. 处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{
String message = new String(delivery.getBody());
System.out.println(message);
};
// 4. 取消消息处理的回调对象
CancelCallback cancelCallback = s -> {
System.out.println("消息消费被中断");
};
// 5. 接收消息,把消息传递到回调对象处理,参数:
/* 1.消费哪个队列
2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
3.消费者未成功消费的回调
*/
channel.basicConsume("hello", true, deliverCallback, cancelCallback);
}
}
4. 测试
运行Consumer
,再运行Producer
,成功接收到消息:
工作队列模式
Work Queues
是官方中提出的第二种工作模式,一个生产者发送消息,有多个消费者来监听任务,但是只有一个消费者能收到消息:
工作队列/任务队列
的主要思想是把任务封装为消息并将其发送到队列。在后台运行的工作进程将接收到任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。从而能实现异步多线程任务。
这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中不可能处理复杂的任务。
案例演示
复制一份Consumer
类,分别启动:
在Producer
类中,分别发送aaa、bbb、ccc、ddd四个消息:
第一个消费者,收到了aaa、ccc:
第二个消费者,收到了bbb、ddd:
总结:在该模式下,一个消息只能被一个线程消费,采用的是轮询接收。
发布/订阅模式
在发布/订阅模式
中,消息的发送者通过消息通道广播出去,让订阅改消息主题的订阅者消费。
就是一个生产者发送的消息会被多个消费者获取,所以也叫广播模式、一对多模式。
流程示意图如下:
1、 生产者将消息发送到交换机;
2、 交换机将信息发给所有绑定的队列;
3、 绑定队列的消费者收到消息;
该模式需要指定一个Exchange
交换机,起本身只负责转发消息,不具备存储消息的能力。一方面,接收生产者发送的消息。另一方面,需要知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。
到底如何操作,取决于Exchange
的类型。有常见以下几种类型:
- Fanout:广播。将消息交给所有绑定到交换机的队列
- Direct:定向。把消息交给符合指定routing key的队列
- Topic:通配符。把消息交给符合routing pattern(路由模式)的队列
- Header:通过消息内容中的headers属性来进行匹配
按照流程,编写生产者代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 新建连接
Connection connection = factory.newConnection();
// 3. 新建通道
Channel channel = connection.createChannel();
/*
* 4. 声明一个交换机
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean , Map<String, Object> arguments)
* 参数:
* exchange=》交换机名称
* type=》指定交换机的类型为FANOUT广播模式
* durable=》是否持久化
* autoDelete=》自动删除
* internal=》内部使用,一般为false
* arguments=》参数
*/
String exchangeName = "fanoutExchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
// 5. 创建队列
String fanoutQueueOne = "fanoutQueueOne";
String fanoutQueueTwo = "fanoutQueueTwo";
channel.queueDeclare(fanoutQueueOne, true, false, false, null);
channel.queueDeclare(fanoutQueueTwo, true, false, false, null);
// 6. 绑定队列和交换机,参数(队列名称、交换机名称,路由键(绑定规则,如果交换机的类型为fanout,routingKey为“”)
channel.queueBind(fanoutQueueOne, exchangeName, "");
channel.queueBind(fanoutQueueTwo, exchangeName, "");
// 7. 发送消息
String body = "广播消息";
channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));
// 8. 关闭连接
channel.close();
connection.close();
System.out.println("消息发送完毕");
}
}
运行程序,发送消息,在控制台中,可以看到创建的交换机、绑定关系如下图:
在创建的两个队列中,可以看到都收到了消息,并处于就绪状态:
创建消费者,并复制代码,运行两个消费者:
public class FanoutConsumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 创建连接及通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
// 3. 绑定队列,队列和交换机绑定
String fanoutQueueOne = "fanoutQueueOne";
String exchangeName = "fanoutExchange";
channel.queueBind(fanoutQueueOne, exchangeName, "");
// 4. 处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{
String message = new String(delivery.getBody());
System.out.println(message);
};
// 4. 取消消息处理的回调对象
CancelCallback cancelCallback = s -> {
System.out.println("消息消费被中断");
};
// 5. 接收消息,把消息传递到回调对象处理
channel.basicConsume(fanoutQueueOne, true, deliverCallback, cancelCallback);
}
}
可以看到,每个消息都会被所有消费者接收:
路由模式
官网的路由(Routing)模式流程示意图如下:
该模式也需要加入交换机,指定其类型为direct
。队列在绑定交换机时要指定routing key(路由键)
,消息会转发到符合routing key
的队列。交换机根据routingKey
进行完全匹配,如果匹配失败则丢弃消息。
流程说明:
1、 生产者P,向Exchange发送消息,发送消息时,会指定一个routingkey;
2、 交换机接收生产者的消息,然后把消息递交给与routingkey完全匹配的队列;
3、 消费者C1,其所在队列指定了需要routingkey为error的消息;
4、 消费者C2,其所在队列指定了需要routingkey为info、error、warning的消息;
生产者核心代码:
// 4. 声明一个交换机
String exchangeName = "directExchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 5. 创建队列
String directQueueInfo = "directQueueInfo";
String directQueueError = "directQueueError";
channel.queueDeclare(directQueueInfo, true, false, false, null);
channel.queueDeclare(directQueueError, true, false, false, null);
// 6. 绑定队列和交换机,参数(队列名称、交换机名称,路由键)
channel.queueBind(directQueueInfo, exchangeName, "info");
channel.queueBind(directQueueError, exchangeName, "error");
// 7. 发送消息
channel.basicPublish(exchangeName, "info", null, "INFO日志".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "error", null, "ERROR日志".getBytes(StandardCharsets.UTF_8));
消费者代码:
// 3. 绑定队列
String directQueueError = "directQueueError";
// 队列和交换机绑定
String exchangeName = "directExchange";
channel.queueBind(directQueueError, exchangeName, "error");
// 4. 处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{
String message = new String(delivery.getBody());
System.out.println(message);
};
// 4. 取消消息处理的回调对象
CancelCallback cancelCallback = s -> {
System.out.println("消息消费被中断");
};
// 5. 接收消息,把消息传递到回调对象处理,参数:
channel.basicConsume(directQueueError, true, deliverCallback, cancelCallback);
再复制代码创建另外一个消费者,绑定路由键为info
,分别启动后运行发送消息。
结果如下所示,绑定了error
路由的队列只收到ERROR日志,info
路由的队列只收到INFO日志。
主题模式
主题Topic
类型与Direct
相比,都是可以根据RoutingKe
y把消息路由到不同的队列。只不过Topic
类型的Routing key
可以使用通配符。
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”
分割,例如:order.insert
通配符规则:
- #:匹配一个或多个词
- *:匹配一个词
举例:
- item.#:能够匹配item.spu.insert 或者item.spu
- item.*:只能匹配item.spu
创建Topic
类型的交换机,发送消息:
// 4. 声明一个交换机类型为TOPIC
String exchangeName = "topicExchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
// 5. 发送消息时,携带通配符KEY
channel.basicPublish(exchangeName, "aa.error", null, "日志=》aa.error".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "bbb.ccc.error", null, "日志=>bbb.ccc.error".getBytes(StandardCharsets.UTF_8));
创建两个消费者,使用通配符绑定:
// 3. 绑定队列
String topicQueueError = "topicQueueError";
// 队列和交换机绑定,*.error 只能接到aa.error、bb.error等前缀为一个单词的消息
String exchangeName = "topicExchange";
channel.queueBind(topicQueueError, exchangeName, "*.error");
// 3. 绑定队列
String topicQueueError = "topicQueueError";
// 队列和交换机绑定,#.error 只能接到aa.error、bb.cc.error等前缀为一个或多个单词的消息
String exchangeName = "topicExchange";
channel.queueBind(topicQueueError, exchangeName, "#.error");
运行代码,发送消息,运行结果如下:
RPC 模式
RPC 模式
也就是使用队列实现远程过程调用。实际使用的很少,专业的RPC
框架已经很多了~,这里就不介绍了。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: