旅游电商网站开发,六安网站建设,可以做同城活动的网站,ps制作网站RabbitMQ实战解决方案
RabbitMQ死信队列
死信队列产生的背景
RabbitMQ死信队列俗称#xff0c;备胎队列#xff1b;消息中间件因为某种原因拒收该消息后#xff0c;可以转移到死信队列中存放#xff0c;死信队列也可以有交换机和路由key等。
产生死信队列的原因
消息投…RabbitMQ实战解决方案
RabbitMQ死信队列
死信队列产生的背景
RabbitMQ死信队列俗称备胎队列消息中间件因为某种原因拒收该消息后可以转移到死信队列中存放死信队列也可以有交换机和路由key等。
产生死信队列的原因
消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息消息如果存放到mq服务器中过期之后会转移到备胎死信队列存放。队列达到最大的长度 队列容器已经满了消费者消费多次消息失败就会转移存放到死信队列中 代码整合 参考 Boyatop-springboot-rabbitmq|#中order-dead-letter-queue项目
死信队列的架构原理
死信队列和普通队列区别不是很大
普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
区别
1.生产者投递消息先投递到我们普通交换机中普通交换机在将该消息投到
普通队列中缓存起来普通队列对应有自己独立普通消费者。
2.如果生产者投递消息到普通队列中普通队列发现该消息一直没有被消费者消费
的情况下在这时候会将该消息转移到死信备胎交换机中死信备胎交换机
对应有自己独立的 死信备胎队列 对应独立死信备胎消费者。
死信队列应用场景
1.30分钟订单超时设计
Redis过期key 死信延迟队列实现
采用死信队列创建一个普通队列没有对应的消费者消费消息在30分钟过后
就会将该消息转移到死信备胎消费者实现消费。
备胎死信消费者会根据该订单号码查询是否已经支付过如果没有支付的情况下
则会开始回滚库存操作。
RabbitMQ消息幂等问题
RabbitMQ消息自动重试机制
当我们消费者处理执行我们业务代码的时候如果抛出异常的情况下
在这时候mq会自动触发重试机制默认的情况下rabbitmq是无限次数的重试。
需要人为指定重试次数限制问题
在什么情况下消费者需要实现重试策略
A.消费者获取消息后调用第三方接口但是调用第三方接口失败呢是否需要重试
该情况下需要实现重试策略网络延迟只是暂时调用不通重试多次有可能会调用通。
B.消费者获取消息后因为代码问题抛出数据异常是否需要重试
该情况下是不需要实现重试策略就算重试多次最终还是失败的。 可以将日志存放起来后期通过定时任务或者人工补偿形式。
如果是重试多次还是失败消息需要重新发布消费者版本实现消费
可以使用死信队列 Mq在重试的过程中有可能会引发消费者重复消费的问题。
Mq消费者需要解决 幂等性问题
幂等性 保证数据唯一
方式1
生产者在投递消息的时候生成一个全局唯一id放在我们消息中。
Msg id123456
Msg id123456
Msg id123456
消费者获取到我们该消息可以根据该全局唯一id实现去重复。
全局唯一id 根据业务来定的 订单号码作为全局的id
实际上还是需要再db层面解决数据防重复。
业务逻辑是在做insert操作 使用唯一主键约束
业务逻辑是在做update操作 使用乐观锁 当消费者业务逻辑代码中抛出异常自动实现重试 默认是无数次重试应该对RabbitMQ重试次数实现限制比如最多重试5次每次间隔3s重试多次还是失败的情况下存放到死信队列或者存放到数据库表中记录后期人工补偿
如何合理选择消息重试
消费者获取消息后调用第三方接口但是调用第三方接口失败呢是否需要重试 消费者获取消息后应该代码问题抛出数据异常是否需要重试
总结如果消费者处理消息时因为代码原因抛出异常是需要从新发布版本才能解决的那么就不需要重试重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。
Rabbitmq如何开启重试策略 spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /booya_rabbitmq listener: simple: retry: ####开启消费者程序出现异常的情况下会进行重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000
消费者重试过程中如何避免幂等性问题
重试的过程中为了避免业务逻辑重复执行建议提前全局id提前查询如果存在
的情况下就无需再继续做该流程。
重试的次数最好有一定间隔次数在数据库底层层面保证数据唯一性比如加上唯一id。
SpringBoot开启消息确认机制
配置文件新增 spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: /booyaVirtualHostslistener:simple:retry:####开启消费者程序出现异常的情况下会进行重试enabled: true####最大重试次数max-attempts: 5####重试间隔次数initial-interval: 3000acknowledge-mode: manualdatasource:url: jdbc:mysql://localhost:3306/test?useUnicodetruecharacterEncodingUTF-8username: rootpassword: rootdriver-class-name: com.mysql.jdbc.Driver
消费者ack代码 Slf4jComponentRabbitListener(queues fanout_order_queue)public class FanoutOrderConsumer {Autowiredprivate OrderManager orderManager;Autowiredprivate OrderMapper orderMapper;RabbitHandlerpublic void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {// try {log.info(orderEntity:{}, orderEntity.toString());String orderId orderEntity.getOrderId();if (StringUtils.isEmpty(orderId)) {log.error(orderId is null);return;}OrderEntity dbOrderEntity orderMapper.getOrder(orderId);if (dbOrderEntity ! null) {log.info(该订单已经被消费过无需重复消费!);// 无需继续重试channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}int result orderManager.addOrder(orderEntity);log.info(插入数据库中数据成功);if (result 0) {// 开启消息确认机制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}// int i 1 / 0;// } catch (Exception e) {// // 将失败的消息记录下来后期采用人工补偿的形式// }}}