当前位置: 首页 > news >正文

丹阳网站建设开发大庆免费网站建设公司

丹阳网站建设开发,大庆免费网站建设公司,湖南株洲静默,wordpress rest发文章在生产环境中由于一些不明原因#xff0c;导致 rabbitmq 重启#xff0c;在 RabbitMQ 重启期间生产者消息投递失败#xff0c; 导致消息丢失#xff0c;需要手动处理和恢复。于是#xff0c;我们开始思考#xff0c;如何才能进行 RabbitMQ 的消息可靠投递呢#xff1f; …在生产环境中由于一些不明原因导致 rabbitmq 重启在 RabbitMQ 重启期间生产者消息投递失败 导致消息丢失需要手动处理和恢复。于是我们开始思考如何才能进行 RabbitMQ 的消息可靠投递呢 特别是在这样比较极端的情况RabbitMQ 集群不可用的时候无法投递的消息该如何处理呢 发布确认SpringBoot版本 确认机制方案 代码架构图 配置文件 spring.rabbitmq.host118.31.6.132 spring.rabbitmq.port5672 spring.rabbitmq.usernameadmin spring.rabbitmq.password123 spring.rabbitmq.publisher-confirm-typecorrelated 在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-typecorrelated NONE 禁用发布确认模式是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 经测试有两种效果其一效果和 CORRELATED 值一样会触发回调方法其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel则接下来无法发送消息到 broker 引入依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.example/groupIdartifactIdspringboot-rabbitmq/artifactIdversion0.0.1-SNAPSHOT/versionnamedemo/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version17/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-autoconfigure/artifactIdversion3.2.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project添加配置类 package com.example.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;//声明确认队列Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认交换机Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue)Queue queue,Qualifier(confirmExchange)DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(key1);} }回调接口 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息,由于原因:{},id,cause);}} }消息生产者 package com.example.controller;import com.example.component.MyCallBack; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;RestController RequestMapping(/confirm) Slf4j public class producer {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;Autowiredprivate RabbitTemplate rabbitTemplate;Autowiredprivate MyCallBack myCallBack;PostConstruct //在类实例被创建后通过依赖注入完成并且在任何其他初始化方法之前容器会自动调用这个方法public void init(){rabbitTemplate.setConfirmCallback(myCallBack);}GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){//指定消息id为1CorrelationData correlationData1 new CorrelationData(1);String routingKey key1;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,messageroutingKey,correlationData1);log.info(发送消息内容:{},messageroutingKey);CorrelationData correlationData2 new CorrelationData(2);routingKey key2;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,messageroutingKey,correlationData2);log.info(发送消息内容:{},messageroutingKey);} }消息消费者 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component Slf4j public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msgnew String(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);} }结果分析 访问http://127.0.0.1:8080/confirm/sendMessage/%E4%BD%A0%E5%A5%BD 可以看到发送了两条消息第一条消息的 RoutingKey 为 key1第二条消息的 RoutingKey 为 key2两条消息都成功被交换机接收也收到了交换机的确认回调但消费者只收到了一条消息因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致也没有其它队列能接收这个消息所有第二条 消息被直接丢弃了  回退消息 Mandatory参数 在仅开启了生产者确认机制的情况下交换还击接收到消息之后会直接给消息生产者发送确认消息如果发现该消息不可路由那么消息会被直接丢弃此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下最起码通知我一声我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者 消息生产者代码 package com.example.controller;import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID;RestController Slf4j RequestMapping(/confirm) public class MessageProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructprivate void init(){rabbitTemplate.setConfirmCallback(this);/*** true交换机无法将消息进行路由时会将消息返回给生产者* false如果发现纤细无法进行路由时直接的丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息给谁处理rabbitTemplate.setReturnsCallback(this);}GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){//让消息绑定一个id值CorrelationData correlationData1 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange,key1,messagekey1,correlationData1);log.info(发送消息 id 为:{}内容为{},correlationData1.getId(),messagekey1);CorrelationData correlationData2 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange,key2,messagekey2,correlationData2);log.info(发送消息 id 为:{}内容为{},correlationData2.getId(),messagekey2);}Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData ! null ? correlationData.getId() : ;if (ack) {log.info(交换机收到消息确认成功, id:{}, id);} else {log.error(消息 id:{}未成功投递到交换机,原因是:{}, id, cause);}}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());} }回调接口 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息,由于原因:{},id,cause);}}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());} }消息消费者 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component Slf4j public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msgnew String(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);} }结果分析 备份交换机 备份交换机是 RabbitMQ 中的一个机制用于处理无法被路由到队列的消息。它可以看作是交换机的“备胎”当交换机接收到无法路由的消息时会将这些消息转发到备份交换机中由备份交换机来进行处理 通常情况下备份交换机的类型是 Fanout这意味着它会将所有接收到的消息广播给与其绑定的所有队列。因此当备份交换机接收到无法路由的消息时这些消息会被发送到与备份交换机绑定的队列中 通过设置备份交换机我们可以将无法被路由的消息存储到一个特定的队列中然后可以通过监控这个队列来进行报警或者手动处理。这样做既可以避免丢失消息又不会增加生产者的复杂性同时也提高了系统的稳定性和可靠性 代码架构图 修改配置类 package com.example.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;public static final String BACKUP_EXCHANGE_NAME backup.exchange;public static final String BACKUP_QUEUE_NAME backup.queue;public static final String WARNING_QUEUE_NAME warning.queue;//声明确认队列Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue)Queue queue,Qualifier(confirmExchange)DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(key1);}//声明备份交换机Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认交换机并且绑定备份交换机Beanpublic DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备用交换机.withArgument(alternate-exchange, BACKUP_EXCHANGE_NAME);return (DirectExchange) exchangeBuilder.build();}//声明警告队列Beanpublic Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系Beanpublic Binding warningBinding(Qualifier(warningQueue) Queue queue,Qualifier(backupExchange) FanoutExchangebackupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列Bean(backQueue)public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系Beanpublic Binding backupBinding(Qualifier(backQueue) Queue queue,Qualifier(backupExchange) FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);} }报警消费者 package com.example.controller;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.web.bind.annotation.RestController;RestController Slf4j public class WarningConsumer {public static final String WARNING_QUEUE_NAME warning.queue;RabbitListener(queues WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message) {String msg new String(message.getBody());log.error(报警发现不可路由消息{}, msg);} }测试注意事项 直接运行会报错因为我们之前创建过此队列如要修改只能删除或者新建队列 我们可以登录管理后台删除该队列  结果分析
http://www.hkea.cn/news/14558544/

相关文章:

  • 响应式网站开发周期什么网站可以做数据调查
  • 新乡做网站的公司有那些网站优化公司大家好
  • 网站建设视频教程最新网站开发的经验技巧
  • 网站建设技巧网站设计的风格有哪些
  • 51的网站是啥建立网站导航栏的方法
  • 单纯做网站的公司做盗链网站
  • 苏州优秀网站设计公司纯静态网站 后台
  • 土特产网站建设事业计划书网站自助授权系统
  • 专业的o2o网站建设网站怎么做自己站长
  • 自己做壁纸的网站微信小程序商城需要多少钱
  • 在国内做推广产品用什么网站好什么网站可以做论坛app
  • 科技类公司网站怎么设计建一个公司网站费用
  • 顺义专业建站公司wordpress 多demo
  • 淘宝app网站建设wordpress怎么使用插件
  • 手机怎么做弹幕小视频网站设计免费
  • 海口网站建设中心设计工作室宣传文案
  • 用focusky做h5微网站湖南微信网站建设
  • 可以做宣传的网站有哪些西地那非片吃了多久会硬起来
  • 网站服务器异常是什么意思wordpress网站后台
  • 专业旅游网站建设什么网上平台可以找客源
  • wordpress本地批量传文章广西网站建设产品优化
  • 有关网站建设的参考书网站移动字幕要怎么做
  • 官网网站怎么做夫唯seo系统培训
  • 网站默认样式1000禁用黄app软件排行
  • 北海建设厅网站360导航网址
  • 物联网网站开发做app网站公司名称
  • 网站开发团队 组建网站权限控制
  • 南宁制作网站公司商务网站开发心得
  • 大连网站建设怎么做校园网页设计模板
  • pageadmin自助建站系统wordpress建站网页无法运