模板网站建设流程图,大连做网站首选领超科技,产品型网站案例,网站开发经济可行性分析#x1f34a; Java学习#xff1a;Java从入门到精通总结 #x1f34a; 深入浅出RocketMQ设计思想#xff1a;深入浅出RocketMQ设计思想 #x1f34a; 绝对不一样的职场干货#xff1a;大厂最佳实践经验指南 #x1f4c6; 最近更新#xff1a;2023年3月24日 #x… Java学习Java从入门到精通总结 深入浅出RocketMQ设计思想深入浅出RocketMQ设计思想 绝对不一样的职场干货大厂最佳实践经验指南 最近更新2023年3月24日 个人简介通信工程本硕 for NJU、Java程序员。做过科研paper发过专利优秀的程序员不应该只是CRUD 点赞 收藏 ⭐留言 都是我最大的动力 文章目录RocketMQ事务消息适用场景举例使用示例发送事务消息事务回查事务执行RocketMQ事务消息
RocketMQ针对事务消息扩展了两个相关的概念
1. 半消息
半消息Half Message是一种特殊的消息类型处于这个状态的消息暂时不能被Consumer消费。
当一条事务消息被成功投递到Broker上但Broker没有收到Producer的二次确认时该事务消息就处于暂时不可消费的状态这种消息就是半消息。 2. 消息状态回查
由于网络抖动、系统宕机等等原因可能导致Producer向Broker发送的二次确认信息没有送达。如果Broker检测到某条事务消息长时间处于半消息状态则会主动向Producer端发起回查操作查询该事务消息在Producer端的事务状态。
这个机制主要是用来解决分布式事务中的超时问题。 上图是RocketMQ官网提供的事务消息流程图执行步骤如下
Producer向Broker端发送半消息Broker发送ACK确认表示半消息发送成功Producer执行本地事务本地事务完毕根据事务的状态Producer向Broker发送二次确认消息确认该半消息的Commit或Rollback状态。Broker收到二次确认消息之后如果是Commit状态则直接将消息发送到Consumer端执行消费逻辑如果是Rollback状态则会直接将其标记为失败不会发送给Consumer针对超时情况Broker主动向Producer发起消息回查Producer处理回查消息返回对应的本地事务执行结果Broker针对消息回查的结果执行【步骤4】的操作 适用场景举例
以转账系统为例假设A要向B转账100元执行本地事务和发送异步消息的过程应该同时保持成功或失败即A的账户扣款成功后就一定要发消息发送出去最直观的思路可能有两个
1. 先发消息
这种策略的流程如下 存在的问题是 如果消息发送成功但后续A扣款失败了消费端仍然会消费这条消息进而向B账户里打钱数据就出现不一致的情况了。 2. 后发消息 存在的问题是 如果扣款成功但是发送消息失败就会出现A已经扣钱了但B账户里没有入账的情况同样也是无法接受的。 出现上述情况的根本原因是本地事务和发送消息这两个操作并不是原子的因此也就无法做到同时失败或同时成功所以数据一致性难以保障。 解决上述问题的方法就是上面提到的半消息 如上图所示执行本地事务之前先发送一个半消息此时还不能被消费者消费只有当本地事务执行完毕并发送二次确认消息之后半消息才能被Consumer消费。
如此以来就保证了多个系统数据的数据一致性前提是系统不需要保证数据的强一致性。 使用示例
发送事务消息
RocketMQ发送事务消息设计到消息发送、消息回查、消息二次确认等过程因此这个过程可能会“稍显复杂”。
发送事务消息使用的是TransactionMQProducer一个简单的demo如下
public class TransactionProducer {public static void main(String[] args) throws MQClientException {TransactionCheckListener transactionCheckListener new TransactionCheckListener() {Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {return null;}};TransactionMQProducer producer new TransactionMQProducer(GROUP A);producer.setCheckThreadPoolMinSize(2);producer.setCheckThreadPoolMaxSize(2);producer.setCheckRequestHoldMax(2000);producer.setTransactionCheckListener(transactionCheckListener);producer.start();String[] tags new String[]{TAG A, TAG B, TAG C};LocalTransactionExecuter transactionExecuter new LocalTransactionExecuter() {Overridepublic LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {return null;}};for (int i 0; i 10; i) {Message msg new Message(TEST, tags[i % tags.length], KEY i, (HELLO, ROCKETMQ i).getBytes());SendResult result producer.sendMessageInTransaction(msg, transactionExecuter, null);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}producer.shutdown();}
}事务回查
checkLocalTransaction是事务消息回查监听方法可以获取本地事务状态根据事务的状态来确定是否要发送二次确认消息或者进行事务回滚操作。
消息回查事务的状态由以下几种情况
LocalTransactionState.ROLLBACK_MESSAGE事务回滚LocalTransactionState.COMMIT_MESSAGE事务提交LocalTransactionState.UNKNOW未知状态此时Broker会定时重新查询Producer消息的状态直到出现前面两种情况。
public interface TransactionListener {LocalTransactionState checkLocalTransaction(final MessageExt msg);
}事务执行
executeLocalTransaction方法用于执行本地事务如果本地事务执行成功则进行事务提交否则进行事务回滚如果是UNKNOW状态的话Broker就会定时回查Producer的消息状态直到彻底成功或失败。
public interface TransactionListener {LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
}