湖南宏泰美佳建设工程有限公司网站,商务网站建设毕业设计模板下载,网络内容建设包括什么,视觉创意网站Broker提交或回滚事务消息
当生产者本地事务处理完成并且Broker回查事务消息后#xff0c;不管执行Commit还是Rollback,都会根据用户本地事务的执行结果发送一个End_transaction的RPC请求给Broker#xff0c;Broker端处理该请求的类是EndTransactionProcessor
第一步…Broker提交或回滚事务消息
当生产者本地事务处理完成并且Broker回查事务消息后不管执行Commit还是Rollback,都会根据用户本地事务的执行结果发送一个End_transaction的RPC请求给BrokerBroker端处理该请求的类是EndTransactionProcessor
第一步End_Transaction请求校验主要检查项如下
1.Broker角色检查。Slave Broker不处理事务消息 2.事务消息类型检查。EndTransactionProcessor只处理 Commit或Rollback类型的事务消息其余消息不处理这里区分了事务回查
第二步进行Commit或Rollback。
根据生产者请求头中的参数判断是Commit请求还是Rollback请求然后分别进行处理
commitMessage()
提交Half消息/这是事务消息服务接口中的一个方法根据消息位点查询了Half消息并将Half消息返回
checkPrepareMessage()
Half消息数据校验。校验内容包括发送消息的生产者组与当前执行Commit/Rollback的生产者是否一致当前Half消息是否与请求Commit/Rollback的消息是否是同一条消息
endMessageTransaction()
消息对象类型转化将MessageExt对象转化为MessageExtBrokerInner对象并且还原消息之前的Topic和ConsumeQueue等信息
sendFinalMessage()
将还原后的事务消息最终发送到CommitLog中一旦发送成功消费者就可以正常拉取消息并消费
deletePrepareMessage()
在sendFinalMessage()执行成功后删除Half消息。其实RocketMQ是不能真正删除消息的其实质是顺序写磁盘相当于做了一个假删除。假删除通过putOpMessage()方法将消息保存到TransactionMessageUtil. buildOpTopic()的Topic中并且做上标记TransactionalMessageUtil.REMOVETAG表示消息已删除
如果消息被标记为已删除则调用addRemoveTagInTransactionOp()方法利用标记为已删除的OP消息构造Message消息对象并且调用存储方法保存消息 TransactionalMessageUtil.buildOpTopic()方法跟保存Half消息时的逻辑类似 Half消息保存在名为MixAll.RMQ_SYS_TRANS_HALF_TOPIC的Topic中执行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC 对象中以便Broker判断是否需要回查生产者事务的执行状态。 调用存储层方法真正地将OP消息保存到了CommitLog中
Rollback实现逻辑
Rollback并没有真正删除消息而是标记Half消息为删除在Broker回查时机会跳过不检查
rollbackMessage()
该方法与CommitMessage()方法一样都是查询Half消息并返回消息对象。
checkPrepareMessage()
消息校验与Commit调用的是同一个方法
deletePrepareMessage()
删除Half消息与Commit调用的是同一个方法