当前位置: 首页 > news >正文

自己如何做家政网站兰州口碑营销

自己如何做家政网站,兰州口碑营销,jsp wordpress,线上设计师都在哪挣钱RabbitMQ 消息可靠保障 消息的可靠性保证生产者重连生产者确认解决思路A-确认机制解决思路B-备份交换机 MQ 服务器宕机导致消息丢失消费端消息的可靠性保障 消费端限流给消息生成唯一id 消息的可靠性保证 实际项目中 MQ 的流程一般是#xff1a;生产端把消息路由到交换机生产端把消息路由到交换机然后由交换机把消息发送到队列接着就是消费端拿到消息进行消费这三个过程都有可能造成消息的不稳定导致不可靠 生产者重连 Spring Boot 提供了一种重试机制只需要通过配置即可实现消息重试从而确保消息的可靠性这里简单介绍一下 spring:rabbitmq:listener:simple:acknowledge-mode: auto # 开启自动确认消费机制retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * 上次等待时间max-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态如果业务中包含事务这里改为false通过配置在消费者的方法上如果执行失败或执行异常只需要抛出异常一定要出现异常才会触发重试注意不要捕获异常 即可实现消息重试这样也可以保证消息的可靠性。 失败重试机制https://www.bilibili.com/video/BV1mN4y1Z7t9?p24spm_id_frompageDrivervd_sourcec3fc4867486b99c0b85501121f575279 生产者确认 生产者确认机制分为两部分生产者到交换机和交换机到队列的可靠性保障 解决思路A-确认机制 在生产端进行确认具体操作中会分别针对交换机和队列来确认如果没有成功发送的队列服务器上那就可以尝试重新发送 首先需要增加下列配置 spring.rabbitmq.host192.168.133.128 spring.rabbitmq.port5672 spring.rabbitmq.passwordadmin spring.rabbitmq.usernameadmin spring.rabbitmq.virtual-host/ spring.rabbitmq.listener.typesimple spring.rabbitmq.publisher-confirm-typeCORRELATED #交换机的确认 spring.rabbitmq.publisher-returnstrue #队列的确认这里 publisher-confirm-type 有三种模式可选 none表示禁用发送方确认机制correlated表示开启发送方确认机制simple表示开启发送方确认机制并支持 waitForConfirms() 和 waitForConfirmsOrDie() 的调用。 接下来增加一个Rabbit 配置 Configuration Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {Resourceprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 消息发送到交换机成功或失败时调用此方法log.info(confirm函数打印correlationData correlationData);log.info(confirm函数打印ack ack);log.info(confirm函数打印cause cause);}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 发送队列失败时才调用此方法log.info(消息主体 new String(returnedMessage.getMessage().getBody()));log.info(应答码 returnedMessage.getReplyCode());log.info(描述 returnedMessage.getReplyText());log.info(交换机 returnedMessage.getExchange());log.info(路由key returnedMessage.getRoutingKey());} }为了方便测试定义一个交换机和队列吧并声明它们的绑定关系 public class RabbitMQConfig {//定义一个交换机已及 routingkey用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE normal.demo.exchange;public static final String NORMAL_ROUTING_KEY normal.demo.routingkey;public static final String NORMAL_QUEUE normal.demo.queue;Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE);}Beanpublic Binding normalBinding(Qualifier(normalQueue) Queue queue,Qualifier(normalExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY);} }在 controller 层测试 RestController RequestMapping(/rabbit) public class RabbitController {Resourceprivate RabbitTemplate rabbitTemplate;GetMapping(/test)public String test(){rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, Message Test Confirm~~);return success.;} }消息成功路由到交换机然后交换机发送到队列正常情况下是下面的输出 2024-08-17T17:08:35.10508:00 INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672] 2024-08-17T17:08:35.14908:00 INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection3542faa [delegateamqp://admin192.168.133.128:5672/, localPort63927] 2024-08-17T17:08:35.20408:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T17:08:35.20408:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T17:08:35.20508:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印causenull现在改一下模拟路由交换机失败的场景注意没有 RabbitMQConfig.NORMAL_EXCHANGE~,这个交换机 rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE~, RabbitMQConfig.NORMAL_ROUTING_KEY, Message Test Confirm~~);重新测试此时输出如下错误提示的还是很明显的 2024-08-17T17:11:39.98108:00 INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672] 2024-08-17T17:11:40.03708:00 INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection568384f7 [delegateamqp://admin192.168.133.128:5672/, localPort64266] 2024-08-17T17:11:40.07408:00 ERROR 15864 --- [68.133.128:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange normal.demo.exchange~ in vhost /, class-id60, method-id40) 2024-08-17T17:11:40.07608:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T17:11:40.07608:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印ackfalse 2024-08-17T17:11:40.07708:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印causechannel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange normal.demo.exchange~ in vhost /, class-id60, method-id40)现在再模拟交换机发送消息到队列失败的场景改一下 routingKey rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY~, Message Test Confirm~~); 重新测试此时输出如下returnedMessage 方法执行了 2024-08-17T17:13:49.83508:00 INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672] 2024-08-17T17:13:49.88908:00 INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection572e41be [delegateamqp://admin192.168.133.128:5672/, localPort64519] 2024-08-17T17:13:49.93308:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 消息主体Message Test Confirm~~ 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 应答码312 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 描述NO_ROUTE 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 交换机normal.demo.exchange 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 路由keynormal.demo.routingkey~ 2024-08-17T17:13:49.93508:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T17:13:49.93608:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T17:13:49.93608:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印causenull顺便了解一下spring.rabbitmq.template.mandatory 配置 当使用 RabbitMQ 作为消息队列时spring.rabbitmq.template.mandatory 配置项用于控制 MQ 消息模板RabbitTemplate的发送行为特别是在消息路由时找不到队列queue的情况下的处理方式。 2、spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true, 3、spring.rabbitmq.template.mandatory 结果为 false 时会忽略掉spring.rabbitmq.publisher-returns 属性的值也就是 returnedMessage 方法不会执行结果为 truereturnedMessage 可以得到执行 4、spring.rabbitmq.template.mandatory 结果为null即不配置时结果由spring.rabbitmq.publisher-returns 确定 解决思路B-备份交换机 上面思路A对于消息是否成功发送到队列是通过重写 RabbitTemplate.ReturnsCallback 接口来操作的备份交换机是对这一操作的替换二选一如果即配置了 ReturnsCallback 回调 又配置了备份交换机实际运行发现备份交换机优先级更高 为目标交换机指定备份交换机当目标交换机投递失败时把消息投递至备份交换机(实际项目中很少用一般用思路A) 在上面代码的基础上改一下RabbitMQConfig 就单纯的声明队列和交换机 public class RabbitMQConfig {//定义一个正常交换机以及 routingkey用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE normal.demo.exchange;public static final String NORMAL_ROUTING_KEY normal.demo.routingkey;public static final String NORMAL_QUEUE normal.demo.queue;//定义一个备份交换机以及 队列用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE_BACKUP normal.demo.exchange.backup;public static final String NORMAL_QUEUE_BACKUP normal.demo.queue.backup; }定义一个消费者监听队列实际项目中都会有的有如下注意点 备份交换机必须定义成 FANOUT 类型声明正常交换机的时候指定一个备份交换机通过 alternate-exchange 参数代码有体现 Slf4j Component public class RabbitConsumer {RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE, type ExchangeTypes.DIRECT, arguments {Argument(name alternate-exchange, value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),key RabbitMQConfig.NORMAL_ROUTING_KEY))public void receiveA(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(正常队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE, msg);}RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE_BACKUP),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type ExchangeTypes.FANOUT)))public void receiveB(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(备份队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg);} }注意RabbitListener 必须先创建队列不然报错 有三个 属性 queues()、bindings()、queuesToDeclare()它们之间是互斥的。设定了queues()就不能再设定 bindings() 和 queuesToDeclare()了具体用法可以看这篇文章 接下来模拟无法到达队列(如果无法到达交换机直接报错没有后面什么事了…)测试能不能正常进入备份交换机 rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY~, Message Test Confirm~~);运行结果如下 2024-08-17T18:25:32.25308:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T18:25:32.25408:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T18:25:32.25408:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印causenull 2024-08-17T18:25:32.25708:00 INFO 22476 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 备份队列normal.demo.queue.backup收到消息Message Test Confirm~~MQ 服务器宕机导致消息丢失 这一点官方早就考虑到了现在无论是交换机还是队列消息都是默认持久化到硬盘上哪怕服务器重启也不会导致消息丢失 消息持久化 Override public void sendMessage() {// 构造消息将消息持久化Message message MessageBuilder.withBody(单程车票.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 向MQ发送消息消息内容都为消息表记录的idrabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message); }队列和交换机的持久化默认就是 true无需显示指定 Bean public Queue queue() {// 四个参数name队列名、durable持久化、 exclusive独占、autoDelete自动删除return new Queue(MESSAGE_QUEUE, true); } Bean public DirectExchange exchange() {// 四个参数name交换机名、durable持久化、autoDelete自动删除、arguments额外参数return new DirectExchange(Direct_Exchange, true, false); }消费端消息的可靠性保障 配置文件增加 spring.rabbitmq.listener.simple.acknowledge-modemanual 配置也就是下面这样 spring.rabbitmq.host192.168.133.128 spring.rabbitmq.port5672 spring.rabbitmq.passwordadmin spring.rabbitmq.usernameadmin spring.rabbitmq.virtual-host/ spring.rabbitmq.listener.typesimple spring.rabbitmq.publisher-confirm-typeCORRELATED spring.rabbitmq.publisher-returnstrue spring.rabbitmq.listener.simple.acknowledge-modemanual #把消息确认模式改成手动确认因为 RabbitMQ 客户端默认是 自动返回 ACK 确认的也就是不管是处理成功还是失败默认都按成功来处理这样就不太好所以这个配置要改成手动确认 需要提前知道个东西啥是 deliveryTag 队列的定义这里把备份交换机队列都删了哈 public class RabbitMQConfig {//定义一个正常交换机以及 routingkey用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE normal.demo.exchange;public static final String NORMAL_ROUTING_KEY normal.demo.routingkey;public static final String NORMAL_QUEUE normal.demo.queue; }消费者逻辑如下 Slf4j Component public class RabbitConsumer {RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE, type ExchangeTypes.DIRECT, arguments {Argument(name alternate-exchange, value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),key RabbitMQConfig.NORMAL_ROUTING_KEY))public void receiveA(Message message, Channel channel) throws IOException {// 消息内容String msg new String(message.getBody());// 消息的 deliveryTaglong deliveryTag message.getMessageProperties().getDeliveryTag();try {// 操作成功返回 ack 信息int i 1/0; // 模拟消息处理异常channel.basicAck(deliveryTag, false);log.info(正常队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE, msg);} catch (Exception ex) {// 获取当前消息是否是重复投递的// true-说明消息已经重复投递过一次了false-说明当前消息是第一次投递Boolean redelivered message.getMessageProperties().getRedelivered();// 操作失败返回 Nack 信息if (redelivered) {// requeue 参数控制消息是否重新放入队列 true-重放队列broker会重新投递消息; false-不重放broker会丢弃消息channel.basicNack(deliveryTag, false, false); // basicNack(long deliveryTag, boolean multiple, boolean requeue)} else {channel.basicNack(deliveryTag, false, true);}log.info(正常队列{}收到消息,redelivered: {}但是处理异常{}, RabbitMQConfig.NORMAL_QUEUE, redelivered, msg);}// basicReject 和 basicNack 区别- 唯一的区别就是 basicNack 有批量操作控制就是 multiple 参数是否批量处理true 表示将一次性 ack 所有小于 deliveryTag 的消息// channel.basicReject(deliveryTag, false); //basicReject(long deliveryTag, boolean requeue)} }2024-08-17T19:48:38.13108:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T19:48:38.13208:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T19:48:38.13208:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印causenull 2024-08-17T19:48:55.20508:00 INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 正常队列normal.demo.queue收到消息,redelivered: false但是处理异常Message Test Confirm~~ 2024-08-17T19:49:12.00708:00 INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 正常队列normal.demo.queue收到消息,redelivered: true但是处理异常Message Test Confirm~~具体效果可以本地 debug 下哈消费完并且都 ack 后这里都是0了 消费端限流 我们都知道 MQ 有削峰填谷的效果假设有下面的场景消息队列里面有10000条消息但是消费端的并发能力只有 1000为了避免一次性把这1万条消息全部取出导致消费端压力太大我们可以做个设置每次最多取1000条消息对消费端也是一种保护。 从操作层面来说也比较简单配置spring.rabbitmq.listener.simple.prefetch 属性即可 做个小实验 当不设置 prefetch 时生产者一次性向交换机中投递100条消息消费者确认消息的方式设置为手动确认在确认消息之前线程休眠 5 秒因为 rabbitmq 管理控制台数据更新是 5 秒更新一次这里设置为 5 秒比较方便观察可以观察到队列是一次性将100条数据全部发送给了消费者然后消费者再进行处理 生产者一次发送10条消息到交换机 GetMapping(/test)public String test(){for (int i 0; i 100; i) {rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, Message Test Confirmi);}return success.;}消费者在确认之前休眠5秒 RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE_BACKUP),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type ExchangeTypes.FANOUT))) public void receiveB(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(备份队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg); }现在增加spring.rabbitmq.listener.simple.prefetch2配置就是一次从队列中取两条数据再观察结果 会发现消费端不再是一次性全部把消息取出而是每次只取 2 个这就是 prefetch 配置作用 给消息生成唯一id 可以在消费端打印消息 id此时消息Id是 null 2024-08-18T20:43:46.29508:00 INFO 24028 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 消息id:null消息内容:Message Test Confirm重写 MessageConverter Bean public MessageConverter messageConverter() {// 定义消息转换器Jackson2JsonMessageConverter jjmc new Jackson2JsonMessageConverter();// 配置自动创建消息 id, 用于识别不同消息也可以在业务中基于ID判断是否重复消息jjmc.setCreateMessageIds(true);return jjmc; }再次打印 2024-08-18T21:01:20.75208:00 INFO 24268 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 消息id:ae1a208d-8045-4cb1-a4d9-6d8aabc6b8c8消息内容:Message Test Confirm
http://www.hkea.cn/news/14482173/

相关文章:

  • 锦州宝地建设集团有限公司网站常用的seo工具推荐
  • c 在网站开发方面有优势吗建设会员网站需要多少钱
  • 网站开发定价学校网站群建设 ppt
  • wordpress 站群系统上海企业网上公示
  • 怎么建设商城网站做一个网站得做多少个页面
  • 中国建设银行快捷付授权网站网站开发工程师社交
  • 网站推广营销怎么做网站建设的公司好做吗
  • 启航做网站好吗医院网站源码下载
  • 天津的网站建设公司客户管理系统内容
  • 手机上怎么做自己卖菜的网站seo站内优化最主要的是什么
  • 视频网站上市公司有哪些一浪网站建设
  • 织梦网站模板如何安装深圳网站开发公司哪家好
  • 网站规划建设案例如何配置iis网站
  • 有声阅读网站如何建设网店代运营是什么意思
  • 网站都有什么语言陕西旭泽建设有限公司网站
  • 专题网站建设方案wordpress单页面博客
  • 网站开发如何进行管理开发一个平台
  • 一个域名多个网站外贸 wordpress模板
  • 上海做展会的网站都有哪些浙江省城乡住房建设网站
  • 我想建立一个网站不知道怎么做啊东莞网络优化推广
  • 在地税网站怎么做税种认定哪个网站教做ppt模板
  • 黄山网站开发wordpress 标签前缀
  • dedecms精仿学校网站模板固戍网站建设
  • 网站备案一般需要多久做视频网站 视频放在哪
  • 宿迁网站建设案例团购网站建设方案
  • 网站标题作弊网站开发平台及常用开发工具
  • wordpress主页设置分类网站优化公司推荐
  • 河北企业建站系统信息移动端网站开发标题设置
  • 化工集团网站建设 中企动力重庆富通科技有限公司网站
  • 江干建设局网站南宁网站制作最新招聘信息