分布式消息队列RocketMQ
四、 RocketMQ应用
4.1)普通消息
4、 1.1)消息发送分类;
Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果
4、 1.1.1)同步发送消息;
同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息;该方式 的消息可靠性高,但消息发送效率太低。
4、 1.1.2)异步发送消息;
异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
4、 1.1.3)单向发送消息;
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK,该方式的消息发送效率高,但消息可靠性较差。
4、 1.2)代码实现案例;
4、 1.2.1)创建工程;
创建一个Maven的Java工程rocketMqTest,在其POM文件导入相关依赖:
* *<properties>
* * * *<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
* * * *<maven.compiler.source>8</maven.compiler.source>
* * * *<maven.compiler.target>8</maven.compiler.target>
* *</properties>
* *<dependencies>
* * * *<!--需要与RocketMQ版本相同-->
* * * *<dependency>
* * * * * *<groupId>org.apache.rocketmq</groupId>
* * * * * *<artifactId>rocketmq-client</artifactId>
* * * * * *<version>4.9.3</version>
* * * *</dependency>
* *</dependencies>
4、 1.2.2)定义同步消息发送生产者;
// 定义同步消息发送生产者
public class SyncProducer {
* *public static void main(String[] args) throws Exception {
* * * *// 创建一个producer,参数为Producer Group名称
* * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
* * * *// 指定nameServer地址
* * * *producer.setNamesrvAddr("rocketmqOS:9876");
* * * *// 设置当发送失败时重试发送的次数,默认为2次
* * * *producer.setRetryTimesWhenSendFailed(3);
* * * *// 设置发送超时时限为5s,默认3s
* * * *producer.setSendMsgTimeout(5000);
* * * *// 开启生产者
* * * *producer.start();
* * * *// 生产并发送100条消息
* * * *for (int i = 0; i < 100; i++) {
* * * * * *byte[] body = ("Hi," + i).getBytes();
* * * * * *Message msg = new Message("someTopic", "someTag", body);
* * * * * *// 为消息指定key
* * * * * *msg.setKeys("key-" + i);
* * * * * *// 同步发送消息
* * * * * *SendResult sendResult = producer.send(msg);
* * * * * *System.out.println(sendResult);
* * * }
* * * *// 关闭producer
* * * *producer.shutdown();
* }
}
输出:
消息发送的状态 SendStatus 说明:
public enum SendStatus {
// 发送成功
* *SEND_OK,
* *// 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出 现这种异常状态。异步刷盘不会出现
* *FLUSH_DISK_TIMEOUT,
* * // Slave同步超时。当Broker集群设置的Master-Slave的复 制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
* *FLUSH_SLAVE_TIMEOUT,
* * // 没有可用的Slave。当Broker集群设置为Master-Slave的 复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
* *SLAVE_NOT_AVAILABLE;
* *private SendStatus() {
* }
}
4、 1.2.3)定义异步消息发送生产者;
//定义异步消息发送生产者
public class AsyncProducer {
* *public static void main(String[] args) throws Exception {
* * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
* * * *producer.setNamesrvAddr("rocketmqOS:9876");
* * * *// 指定异步发送失败后不进行重试发送
* * * *producer.setRetryTimesWhenSendAsyncFailed(0);
* * * *// 指定新创建的Topic的Queue数量为2,默认为4
* * * *producer.setDefaultTopicQueueNums(2);
// 开启生产者
* * * *producer.start();
// 生产并发送100条消息
* * * *for (int i = 0; i < 100; i++) {
* * * * * *byte[] body = ("Hi," + i).getBytes();
* * * * * *try {
* * * * * * * *Message msg = new Message("myTopicA", "myTag", body);
* * * * * * * *// 异步发送。指定回调
* * * * * * * *producer.send(msg, new SendCallback() {
* * * * * * * * * *// 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
* * * * * * * * * *@Override
* * * * * * * * * *public void onSuccess(SendResult sendResult) {
* * * * * * * * * * * *System.out.println(sendResult);
* * * * * * * * * }
* * * * * * * * * *@Override
* * * * * * * * * *public void onException(Throwable e) {
* * * * * * * * * * * *e.printStackTrace();
* * * * * * * * * }
* * * * * * * });
* * * * * } catch (Exception e) {
* * * * * * * *e.printStackTrace();
* * * * * }
* * * } // end-for
* * * *// sleep一会儿
* * * *// 由于采用的是异步发送,所以若这里不sleep,则消息还未发送就会将producer给关闭,报错
* * * *TimeUnit.SECONDS.sleep(3);
* * * *producer.shutdown();
* }
}
输出:
4、 1.2.4)定义单向消息发送生产者;
// 定义单向消息发送生产者
public class OnewayProducer {
* *public static void main(String[] args) throws Exception{
* * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
* * * *producer.setNamesrvAddr("rocketmqOS:9876");
* * * *// 开启生产者
* * * *producer.start();
// 生产并发送100条消息
* * * *for (int i = 0; i < 10; i++) {
* * * * * *byte[] body = ("Hi," + i).getBytes();
* * * * * *Message msg = new Message("single", "someTag", body);
* * * * * *// 单向发送,无返回值
* * * * * *producer.sendOneway(msg);
* * * }
* * * *producer.shutdown();
* * * *System.out.println("producer shutdown");
* }
}
输出:
4、 1.2.5)定义消息消费者;
//定义消息消费者
public class SomeConsumer {
* *public static void main(String[] args) throws MQClientException {
* * * *// 定义一个pull消费者
* * * *// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
* * * *// 定义一个push消费者
* * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
* * * *// 指定nameServer
* * * *consumer.setNamesrvAddr("rocketmqOS:9876");
* * * *// 指定从第一条消息开始消费
* * * *consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
* * * *// 指定消费topic与tag
* * * *consumer.subscribe("someTopic", "*");
* * * *// 指定采用“广播模式”进行消费,默认为“集群模式”
* * * *// consumer.setMessageModel(MessageModel.BROADCASTING);
* * * *// 注册消息监听器
* * * *consumer.registerMessageListener(new MessageListenerConcurrently() {
* * * * * *// 一旦broker中有了其订阅的消息就会触发该方法的执行,
* * * * * *// 其返回值为当前consumer消费的状态
* * * * * *@Override
* * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * *ConsumeConcurrentlyContext context) {
* * * * * * * *// 逐条消费消息
* * * * * * * *for (MessageExt msg : msgs) {
* * * * * * * * * *System.out.println(msg);
* * * * * * * }
* * * * * * * *// 返回消费状态:消费成功
* * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
* * * * * }
* * * });
* * * *// 开启消费者消费
* * * *consumer.start();
* * * *System.out.println("Consumer Started");
* }
}
输出:
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: