目录
简介事务消息
Producer端
sendMessageInTransaction
Broker端
SendMessageProcessor
EndTransactionProcessor
TransactionalMessageCheckService
简介事务消息
先盗个流程图来简介一下事务消息的大致流程:
1、 Producer先发送半消息,半消息暂时对消费者们不可见;
2、 半消息发送成功;
3、 发送半消息成功后,Producer开始执行本地事务,本地事务执行成功后返回事务状态(Commit\Rollback\Unknow);
4、 将本地事务状态返回给Broker,如果是Commit,则将半消息暴露给消费者消费,如果是Rollback,则删除该消息,如果是Unknow,则表示Broker需要调用事务回查接口来判断具体事务状态;
5、 因为返回Unknow或者网络原因等Commit/Rollback指令未发送到Broker,Broker定时调用回查接口判断本地事务状态;
6、 执行Producer端定义的回查接口逻辑;
7、 回查接口返回事务状态,Broker根据Commit/Rollback状态进行不同处理;
下边从Producer端和Broker端来分析源码是如何实现的
Producer端
来看Producer端的事务消息实现,与普通消息的发送有以下几点区别
1、 TransactionMQProducer事务消息的Producer实现类,继承DefaultMQProducer并扩展了以下两个属性;
2、TransactionListener 事务监听器,实现两个接口来执行本地事务和回查
3、 LocalTransactionState本地事务执行状态;
example包中示例的生产者端代码
sendMessageInTransaction
1、 发送前校验(清除延时标记、消息大小校验);
2、 设置事务消息标记并发送;
3、 发送成功后调用监听器执行本地事务;
4、 向Broker发送END_TRANSACTION消息返回本地事务状态;
Broker端
Broker端有三部分,一是接收半消息的处理,二是Producer返回Broker本地事务状态,三是Broker进行事务状态回查,下边挨个看代码实现
SendMessageProcessor
SendMessageProcessor是Broker中接收RequestCode.SEND_MESSAGE消息的处理类
asyncSendMessage
事务消息使用TransactionalMessageService进行存储消息
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#asyncPrepareMessage
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#asyncPutHalfMessage
消息和延迟消息一样被替换了Topic,写入到了RMQ_SYS_TRANS_HALF_TOPIC 主题下,再由定时任务进行事务状态判断,然后写入原Topic由消费者消费
EndTransactionProcessor
生产者返回本地事务状态,EndTransactionProcessor是Broker端对RequestCode.END_TRANSACTION的处理类,根据返回的事务状态是Commit还是Rollback做不同处理
Commit: 将消息查询出来恢复原Topic等,重新写入CommitLog进行消息分发,再删除半消息,这里删除不是真的删除,是将消息写到了RMQ_SYS_TRANS_OP_HALF_TOPIC这个主题下,表示已经处理过了
Rollback: 除了不用恢复原Topic重新写入,其他一致
org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
TransactionalMessageCheckService
Broker通过TransactionalMessageCheckService定时对RMQ_SYS_TRANS_HALF_TOPIC主题的半消息进行监测和回查状态,默认一分钟一执行
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check
查询已经处理的OP队列消息,判断该事务消息如果还未处理,向Producer组中的一个发送RequestCode.CHECK_TRANSACTION_STATE命令来检测本地事务状态,如果该事务消息已经处理,则更新消费进度然后忽略。(大致是这样,但是有些代码部分没理解深刻。。)
ClientRemotingProcessor
Producer端接收检测事务状态请求(CHECK_TRANSACTION_STATE)的处理器
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: