14、RocketMQ源码分析:事务消息

目录

简介事务消息

Producer端

sendMessageInTransaction

Broker端

SendMessageProcessor

EndTransactionProcessor

TransactionalMessageCheckService


简介事务消息

先盗个流程图来简介一下事务消息的大致流程:

*

1、 Producer先发送半消息,半消息暂时对消费者们不可见;

2、 半消息发送成功;

3、 发送半消息成功后,Producer开始执行本地事务,本地事务执行成功后返回事务状态(Commit\Rollback\Unknow);

4、 将本地事务状态返回给Broker,如果是Commit,则将半消息暴露给消费者消费,如果是Rollback,则删除该消息,如果是Unknow,则表示Broker需要调用事务回查接口来判断具体事务状态;

5、 因为返回Unknow或者网络原因等Commit/Rollback指令未发送到Broker,Broker定时调用回查接口判断本地事务状态;

6、 执行Producer端定义的回查接口逻辑;

7、 回查接口返回事务状态,Broker根据Commit/Rollback状态进行不同处理;

下边从Producer端和Broker端来分析源码是如何实现的

Producer端

来看Producer端的事务消息实现,与普通消息的发送有以下几点区别

1、 TransactionMQProducer事务消息的Producer实现类,继承DefaultMQProducer并扩展了以下两个属性;

*2、TransactionListener 事务监听器,实现两个接口来执行本地事务和回查

*

3、 LocalTransactionState本地事务执行状态;

*

example包中示例的生产者端代码

*

*

sendMessageInTransaction

1、 发送前校验(清除延时标记、消息大小校验);

2、 设置事务消息标记并发送;

3、 发送成功后调用监听器执行本地事务;

4、 向Broker发送END_TRANSACTION消息返回本地事务状态;

*

Broker端

Broker端有三部分,一是接收半消息的处理,二是Producer返回Broker本地事务状态,三是Broker进行事务状态回查,下边挨个看代码实现

SendMessageProcessor

SendMessageProcessor是Broker中接收RequestCode.SEND_MESSAGE消息的处理类

asyncSendMessage

事务消息使用TransactionalMessageService进行存储消息

*

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#asyncPrepareMessage

*

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#asyncPutHalfMessage*

消息和延迟消息一样被替换了Topic,写入到了RMQ_SYS_TRANS_HALF_TOPIC 主题下,再由定时任务进行事务状态判断,然后写入原Topic由消费者消费

*

EndTransactionProcessor

生产者返回本地事务状态,EndTransactionProcessor是Broker端对RequestCode.END_TRANSACTION的处理类,根据返回的事务状态是Commit还是Rollback做不同处理

Commit: 将消息查询出来恢复原Topic等,重新写入CommitLog进行消息分发,再删除半消息,这里删除不是真的删除,是将消息写到了RMQ_SYS_TRANS_OP_HALF_TOPIC这个主题下,表示已经处理过了

Rollback: 除了不用恢复原Topic重新写入,其他一致

org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest

*

TransactionalMessageCheckService

Broker通过TransactionalMessageCheckService定时对RMQ_SYS_TRANS_HALF_TOPIC主题的半消息进行监测和回查状态,默认一分钟一执行

*

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check

查询已经处理的OP队列消息,判断该事务消息如果还未处理,向Producer组中的一个发送RequestCode.CHECK_TRANSACTION_STATE命令来检测本地事务状态,如果该事务消息已经处理,则更新消费进度然后忽略。(大致是这样,但是有些代码部分没理解深刻。。

*

ClientRemotingProcessor

Producer端接收检测事务状态请求(CHECK_TRANSACTION_STATE)的处理器

*

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: