当前位置: 首页 > news >正文

网站开发设计有限公司郑州软件公司排行榜

网站开发设计有限公司,郑州软件公司排行榜,淘客网站做百度推广,网站建设优化两千字分布式事务解决方案之可靠消息最终一致性 什么是可靠消息最终一致性事务 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息#xff0c;事务参与方(消息消费者)一定能 够接收消息并处理事务成功#xff0c;此方案强调的是只要消息发给事务参与方最终…分布式事务解决方案之可靠消息最终一致性 什么是可靠消息最终一致性事务 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息事务参与方(消息消费者)一定能 够接收消息并处理事务成功此方案强调的是只要消息发给事务参与方最终事务要达到一致。 此方案是利用消息中间件完成如下图\ 事务发起方消息生产方将消息发给消息中间件事务参与方从消息中间件接收消息事务发起方和消息中间件 之间事务参与方消息消费方和消息中间件之间都是通过网络通信由于网络通信的不确定性会导致分布式事 务问题。 因此可靠消息最终一致性方案要解决以下几个问题 1.本地事务与消息发送的原子性问题  本地事务与消息发送的原子性问题即事务发起方在本地事务执行成功后消息必须发出去否则就丢弃消息。即实 现本地事务和消息发送的原子性要么都成功要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最 终一致性方案的关键问题。 先来尝试下这种操作先发送消息再操作数据库 begin transaction //1.发送MQ //2.数据库操作 commit transation; 这种情况下无法保证数据库操作与发送消息的一致性因为可能发送消息成功数据库操作失败。 你立马想到第二种方案先进行数据库操作再发送消息 begin transaction //1.数据库操作 //2.发送MQ commit transation;这种情况下貌似没有问题如果发送MQ消息失败就会抛出异常导致数据库事务回滚。但如果是超时异常数 据库回滚但MQ其实已经正常发送了同样会导致不一致。 2、事务参与方接收消息的可靠性 事务参与方必须能够从消息队列接收到消息如果接收消息失败可以重复接收消息。 3、消息重复消费的问题 由于网络2的存在若某一个消费节点超时但是消费成功此时消息中间件会重复投递此消息就导致了消息的重 复消费。 要解决消息重复消费的问题就要实现事务参与方的方法幂等性。 解决方案 上节讨论了可靠消息最终一致性事务方案需要解决的问题本节讨论具体的解决方案。 本地消息表方案 本地消息表这个方案最初是eBay提出的此方案的核心是通过本地事务保证数据业务操作和消息的一致性然后 通过定时任务将消息发送至消息中间件待确认消息发送给消费方成功再将消息删除。 下面以注册送积分为例来说明 下例共有两个微服务交互用户服务和积分服务用户服务负责添加用户积分服务负责增加积分。 交互流程如下 1、用户注册  用户服务在本地事务新增用户和增加 ”积分消息日志“。用户表和消息表通过本地事务保证一致 下边是伪代码 begin transaction //1.新增用户 //2.存储积分消息日志 commit transation;这种情况下本地数据库操作与存储积分消息日志处于同一个事务中本地数据库操作与记录消息日志操作具备原 子性。 2、定时任务扫描日志 如何保证将消息发送给消息队列呢 经过第一步消息已经写到消息日志表中可以启动独立的线程定时对消息日志表中的消息进行扫描并发送至消息 中间件在消息中间件反馈发送成功后删除该消息日志否则等待定时任务下一周期重试。 3、消费消息 如何保证消费者一定能消费到消息呢 这里可以使用MQ的ack即消息确认机制消费者监听MQ如果消费者接收到消息并且业务处理完成后向MQ 发送ack即消息确认此时说明消费者正常消费消息完成MQ将不再向消费者推送消息否则消费者会不断重 试向消费者来发送消息。 积分服务接收到”增加积分“消息开始增加积分积分增加成功后向消息中间件回应ack否则消息中间件将重复 投递此消息。 由于消息会重复投递积分服务的”增加积分“功能需要实现幂等性。 RocketMQ事务消息方案 RocketMQ 是一个来自阿里巴巴的分布式消息中间件于 2012 年开源并在 2017 年正式成为 Apache 顶级项 目。据了解包括阿里云上的消息产品以及收购的子公司在内阿里集团的消息产品全线都运行在 RocketMQ 之 上并且最近几年的双十一大促中RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消 息为分布式事务实现提供了便利性支持。 RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题RocketMQ 的 设计中 broker 与 producer 端的双向通信能力使得 broker 天生可以作为一个事务协调者存在而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系 统发生异常时依然能够保证达成事务的最终一致性。 在RocketMQ 4.3后实现了完整的事务消息实际上其实是对本地消息表的一个封装将本地消息表移动到了MQ 内部解决 Producer 端的消息发送与本地事务执行的原子性问题。 执行流程如下 为方便理解我们还以注册送积分的例子来描述 整个流程。 Producer 即MQ发送方本例中是用户服务负责新增用户。MQ订阅方即消息消费方本例中是积分服务负责 新增积分。 1、Producer 发送事务消息 Producer MQ发送方发送事务消息至MQ ServerMQ Server将消息状态标记为Prepared预备状态注 意此时这条消息消费者MQ订阅方是无法消费到的。 本例中Producer 发送 ”增加积分消息“ 到MQ Server。 2、MQ Server回应消息发送成功 MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。 3、Producer 执行本地事务 Producer 端执行业务代码逻辑通过本地数据库事务控制。 本例中Producer 执行添加用户操作。 4、消息投递 若Producer 本地事务执行成功则自动向MQServer发送commit消息MQ Server接收到commit消息后将”增加积 分消息“ 状态标记为可消费此时MQ订阅方积分服务即正常消费消息 . 若Producer 本地事务执行失败则自动向MQServer发送rollback消息MQ Server接收到rollback消息后 将删 除”增加积分消息“ 。 MQ订阅方积分服务消费消息消费成功则向MQ回应ack否则将重复接收消息。这里ack默认自动回应即 程序执行正常则自动回应ack。 5、事务回查 如果执行Producer端本地事务过程中执行端挂掉或者超时MQ Server将会不停的询问同组的其他 Producer 来获取事务执行状态这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。 以上主干流程已由RocketMQ实现对用户侧来说用户需要分别实现本地事务执行以及本地事务回查方法因此 只需关注本地事务的执行状态即可。 RoacketMQ提供RocketMQLocalTransactionListener接口 public interface RocketMQLocalTransactionListener { /** ‐ 发送prepare消息成功此方法被回调该方法用于执行本地事务 ‐ param msg 回传的消息利用transactionId即可获取到该消息的唯一Id ‐ param arg 调用send方法时传递的参数当send时候若有额外的参数可以传递到send方法中这里能获取到 ‐ return 返回事务状态COMMIT提交 ROLLBACK回滚 UNKNOW回调 */ RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); /** ‐ param msg 通过获取transactionId来判断这条消息的本地事务执行状态 ‐ return 返回事务状态COMMIT提交 ROLLBACK回滚 UNKNOW回调 */ RocketMQLocalTransactionState checkLocalTransaction(Message msg); } 发送事务消息 以下是RocketMQ提供用于发送事务消息的API TransactionMQProducer producer new TransactionMQProducer(ProducerGroup); producer.setNamesrvAddr(127.0.0.1:9876); producer.start(); //设置TransactionListener实现 producer.setTransactionListener(transactionListener //发送事务消息 SendResult sendResult producer.sendMessageInTransaction(msg, null);RocketMQ实现可靠消息最终一致性事务 业务说明 本实例通过RocketMQ中间件实现可靠消息最终一致性分布式事务模拟两个账户的转账交易过程。 两个账户在分别在不同的银行(张三在bank1、李四在bank2)bank1、bank2是两个微服务。交易过程是张三 给李四转账指定金额。 上述交易步骤张三扣减金额与给bank2发转账消息两个操作必须是一个整体性的事务。 程序组成部分  本示例程序组成部分如下 数据库MySQL-5.7.25 包括bank1和bank2两个数据库。 JDK64位 jdk1.8.0_201 rocketmq 服务端RocketMQ-4.5.0 rocketmq 客户端RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服务框架spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE 微服务及数据库的关系 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 银行1操作张三账户 连接数据库bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 银行2操作李四账户连接数据库bank2 本示例程序技术架构如下 交互流程如下 1、Bank1向MQ Server发送转账消息 2、Bank1执行本地事务扣减金额 3、Bank2接收消息执行本地事务添加金额  创建数据库 导入数据库脚本资料\sql\bank1.sql、资料\sql\bank2.sql已经导过不用重复导入。 创建bank1库并导入以下表结构和数据(包含张三账户) CREATE DATABASE bank1 CHARACTER SET utf8 COLLATE utf8_general_ci; DROP TABLE IF EXISTS account_info; CREATE TABLE account_info ( id bigint(20) NOT NULL AUTO_INCREMENT, account_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT 户 主姓名, account_no varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT 银行 卡号, account_password varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT 帐户密码, account_balance double NULL DEFAULT NULL COMMENT 帐户余额, PRIMARY KEY (id) USING BTREE ) ENGINE InnoDB AUTO_INCREMENT 5 CHARACTER SET utf8 COLLATE utf8_bin ROW_FORMAT Dynamic; INSERT INTO account_info VALUES (2, 张三的账户, 1, , 10000); 创建bank2库并导入以下表结构和数据(包含李四账户)\ CREATE DATABASE bank2 CHARACTER SET utf8 COLLATE utf8_general_ci; CREATE TABLE account_info ( id bigint(20) NOT NULL AUTO_INCREMENT, account_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT 户 主姓名, account_no varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT 银行 卡号, account_password varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT 帐户密码, account_balance double NULL DEFAULT NULL COMMENT 帐户余额, PRIMARY KEY (id) USING BTREE ) ENGINE InnoDB AUTO_INCREMENT 5 CHARACTER SET utf8 COLLATE utf8_bin ROW_FORMAT Dynamic; INSERT INTO account_info VALUES (3, 李四的账户, 2, NULL, 0); 在bank1、bank2数据库中新增de_duplication交易记录表(去重表)用于交易幂等控制 DROP TABLE IF EXISTS de_duplication; CREATE TABLE de_duplication ( tx_no varchar(64) COLLATE utf8_bin NOT NULL, create_time datetime(0) NULL DEFAULT NULL, PRIMARY KEY (tx_no) USING BTREE ) ENGINE InnoDB CHARACTER SET utf8 COLLATE utf8_bin ROW_FORMAT Dynamic;启动RocketMQ 1下载RocketMQ服务器 下载地址http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-binrelease.zip 2解压并启动 启动nameserver set ROCKETMQ_HOME[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqnamesrv.cmd 启动broker set ROCKETMQ_HOME[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnabletrue导入dtx-txmsg-demo  dtx-txmsg-demo是本方案的测试工程根据业务需求需要创建两个dtx-txmsg-demo工程。 1导入dtx-txmsg-demo 导入资料\基础代码\dtx-txmsg-demo到父工程dtx下。 两个测试工程如下 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 操作张三账户连接数据库bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 操作李四账户连接数据库bank2 2父工程maven依赖说明 在dtx父工程中指定了SpringBoot和SpringCloud版本 dependency groupIdorg.springframework.boot/groupId artifactIdspring‐boot‐dependencies/artifactId version2.1.3.RELEASE/version typepom/type scopeimport/scope /dependency dependency groupIdorg.springframework.cloud/groupId artifactIdspring‐cloud‐dependencies/artifactId versionGreenwich.RELEASE/version typepom/type scopeimport/scope /dependency 在dtx-txmsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。 dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq‐spring‐boot‐starter/artifactId version2.0.2/version /dependency3配置rocketMQ 在application-local.propertis中配置rocketMQ nameServer地址及生产组 rocketmq.producer.group producer_bank2 rocketmq.name‐server 127.0.0.1:9876其它详细配置见导入的基础工程。 dtx-txmsg-demo-bank1 dtx-txmsg-demo-bank1实现如下功能 1、张三扣减金额提交本地事务。 2、向MQ发送转账消息。 2Dao Mapper Component public interface AccountInfoDao { Update(update account_info set account_balanceaccount_balance#{amount} where account_no# {accountNo}) int updateAccountBalance(Param(accountNo) String accountNo, Param(amount) Double amount); Select(select count(1) from de_duplication where tx_no #{txNo}) int isExistTx(String txNo); Insert(insert into de_duplication values(#{txNo},now());) int addTx(String txNo); }3AccountInfoService Service Slf4j public class AccountInfoServiceImpl implements AccountInfoService { Resource private RocketMQTemplate rocketMQTemplate; Autowired private AccountInfoDao accountInfoDao; /** * 更新帐号余额‐发送消息 * producer向MQ Server发送消息 * * param accountChangeEvent */ Override public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { //构建消息体 JSONObject jsonObject new JSONObject(); jsonObject.put(accountChange,accountChangeEvent); MessageString message MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult sendResult rocketMQTemplate.sendMessageInTransaction(producer_group_txmsg_bank1, topic_txmsg, message, null); log.info(send transcation message body{},result {},message.getPayload(),sendResult.getSendStatus()); } /** * 更新帐号余额‐本地事务 * producer发送消息完成后接收到MQ Server的回应即开始执行本地事务 * * param accountChangeEvent */ Transactional Override public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { log.info(开始更新本地事务事务号{},accountChangeEvent.getTxNo()); accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmoun t() * ‐1); //为幂等作准备 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() 2){ throw new RuntimeException(bank1更新本地事务时抛出异常); } log.info(结束更新本地事务事务号{},accountChangeEvent.getTxNo()); } }4RocketMQLocalTransactionListener 编写RocketMQLocalTransactionListener接口实现类实现执行本地事务和事务回查两个方法。 Component Slf4j RocketMQTransactionListener(txProducerGroup producer_group_txmsg_bank1) public class ProducerTxmsgListener implements RocketMQLocalTransactionListener { Autowired AccountInfoService accountInfoService; Autowired AccountInfoDao accountInfoDao; //消息发送成功回调此方法此方法执行本地事务 Override Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { //解析消息内容 try { String jsonString new String((byte[]) message.getPayload()); JSONObject jsonObject JSONObject.parseObject(jsonString); AccountChangeEvent accountChangeEvent JSONObject.parseObject(jsonObject.getString(accountChange), AccountChangeEvent.class); //扣除金额 accountInfoService.doUpdateAccountBalance(accountChangeEvent); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error(executeLocalTransaction 事务执行失败,e); e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } } //此方法检查事务执行状态 Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { RocketMQLocalTransactionState state; final JSONObject jsonObject JSON.parseObject(new String((byte[]) message.getPayload())); AccountChangeEvent accountChangeEvent JSONObject.parseObject(jsonObject.getString(accountChange),AccountChangeEvent.class); //事务id String txNo accountChangeEvent.getTxNo(); int isexistTx accountInfoDao.isExistTx(txNo); log.info(回查事务事务号: {} 结果: {}, accountChangeEvent.getTxNo(),isexistTx); if(isexistTx0){ state RocketMQLocalTransactionState.COMMIT; }else{ state RocketMQLocalTransactionState.UNKNOWN; } return state; } } 5Controlle RestController Slf4j public class AccountInfoController { Autowired private AccountInfoService accountInfoService; GetMapping(value /transfer) public String transfer(RequestParam(accountNo)String accountNo,RequestParam(amount) Double amount){ String tx_no UUID.randomUUID().toString(); AccountChangeEvent accountChangeEvent new AccountChangeEvent(accountNo,amount,tx_no); accountInfoService.sendUpdateAccountBalance(accountChangeEvent); return 转账成功; } } dtx-txmsg-demo-bank2 dtx-txmsg-demo-bank2需要实现如下功能 1、监听MQ接收消息。 2、接收到消息增加账户金额。 1 Service 注意为避免消息重复发送这里需要实现幂等。 Service Slf4j public class AccountInfoServiceImpl implements AccountInfoService { Autowired AccountInfoDao accountInfoDao; /** * 消费消息更新本地事务添加金额 * param accountChangeEvent */ Override Transactional public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) { log.info(bank2更新本地账号账号{},金额 {},accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount()); //幂等校验 int existTx accountInfoDao.isExistTx(accountChangeEvent.getTxNo()); if(existTx0){ //执行更新 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmoun t()); //添加事务记录 accountInfoDao.addTx(accountChangeEvent.getTxNo()); log.info(更新本地事务执行成功本次事务号: {}, accountChangeEvent.getTxNo()); }else{ log.info(更新本地事务执行失败本次事务号: {}, accountChangeEvent.getTxNo()); } } } 2MQ监听类 Component RocketMQMessageListener(topic topic_txmsg,consumerGroup consumer_txmsg_group_bank2) Slf4j public class TxmsgConsumer implements RocketMQListenerString { Autowired AccountInfoService accountInfoService; Override public void onMessage(String s) { log.info(开始消费消息:{},s); //解析消息为对象 final JSONObject jsonObject JSON.parseObject(s); AccountChangeEvent accountChangeEvent JSONObject.parseObject(jsonObject.getString(accountChange),AccountChangeEvent.class); //调用service增加账号金额 accountChangeEvent.setAccountNo(2); accountInfoService.addAccountInfoBalance(accountChangeEvent); } } 测试场景 bank1本地事务失败则bank1不发送转账消息。 bank2接收转账消息失败会进行重试发送消息。 bank2多次消费同一个消息实现幂等。 小结 可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性本案例使用了RocketMQ作为 消息中间件RocketMQ主要解决了两个功能 1、本地事务与消息发送的原子性问题。 2、事务参与方接收消息的可靠性。 可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响并实现了两个服务的解耦。
http://www.hkea.cn/news/14531660/

相关文章:

  • 轴承网站建设济南网站建设行知科技
  • 网站关键词搜索排名减少WordPress跳转
  • 市通建设工程质量监督局网站免费公司企业建站代理
  • 模板网站是啥意思做网站推广话术
  • 网站建设推广刘贺稳1公司网站网络营销是什么
  • 浙江省建设工程招投标网站设计网站建设书南昌大学论文
  • 厦门网站建设u建设公司的网站首页
  • 宁波行业网站建设上传发布的步骤分为哪六个部分
  • iis默认网站属性网站对公司的作用是什么
  • 上海建筑公司黄页东莞网站seo推广
  • 网站网络排名优化方法黄冈做网站技术支持的
  • 电子商务网站开发主要有哪些一台网站服务器多少钱
  • 九龙坡网站建设oa办公系统开发
  • 深圳做网站行业网站建设的一般流程是怎样的
  • 无锡网站制作哪家便宜南昌建设局
  • 做网站前期需求分析收费么互联网媒体广告公司
  • 库存网站建设定制关键词优化难度查询
  • 查找网站备案黑色asp企业网站源码
  • 优酷网站建设视频教程集湖州建设局网站 项目验收流程
  • 做网站或者app咨询聊城网站建设
  • 重生做网站小说做网站编辑有人带吗
  • 网站可以做音频线吗seo快速排名的方法
  • 中国建设监理协会网站继续教育系统视频网站 备案
  • 微信网页制作网站游戏建模培训
  • 天津公司网站开发苏州智信建设职业培训网站
  • 海南网站建设学做静态网站
  • 19手机网站浙江人事考试网
  • 网站建设需要做哪些工作wordpress美化滑动
  • 不用服务器怎么做网站seo关键词快速提升软件官网
  • 惠州网站建设公司哪家好汕头个人建站模板