当前位置: 首页 > news >正文

惠州网站建设学校建设部网站燃气管理部门

惠州网站建设学校,建设部网站燃气管理部门,网站上传不了照片,中国十大it培训机构排名在生产环境中由于一些不明原因#xff0c;导致 rabbitmq 重启#xff0c;在 RabbitMQ 重启期间生产者消息投递失败#xff0c; 导致消息丢失#xff0c;需要手动处理和恢复。于是#xff0c;我们开始思考#xff0c;如何才能进行 RabbitMQ 的消息可靠投递呢#xff1f; …在生产环境中由于一些不明原因导致 rabbitmq 重启在 RabbitMQ 重启期间生产者消息投递失败 导致消息丢失需要手动处理和恢复。于是我们开始思考如何才能进行 RabbitMQ 的消息可靠投递呢 特别是在这样比较极端的情况RabbitMQ 集群不可用的时候无法投递的消息该如何处理呢 发布确认SpringBoot版本 确认机制方案 代码架构图 配置文件 spring.rabbitmq.host118.31.6.132 spring.rabbitmq.port5672 spring.rabbitmq.usernameadmin spring.rabbitmq.password123 spring.rabbitmq.publisher-confirm-typecorrelated 在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-typecorrelated NONE 禁用发布确认模式是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 经测试有两种效果其一效果和 CORRELATED 值一样会触发回调方法其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel则接下来无法发送消息到 broker 引入依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.example/groupIdartifactIdspringboot-rabbitmq/artifactIdversion0.0.1-SNAPSHOT/versionnamedemo/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version17/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-autoconfigure/artifactIdversion3.2.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project添加配置类 package com.example.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;//声明确认队列Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认交换机Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue)Queue queue,Qualifier(confirmExchange)DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(key1);} }回调接口 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息,由于原因:{},id,cause);}} }消息生产者 package com.example.controller;import com.example.component.MyCallBack; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;RestController RequestMapping(/confirm) Slf4j public class producer {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;Autowiredprivate RabbitTemplate rabbitTemplate;Autowiredprivate MyCallBack myCallBack;PostConstruct //在类实例被创建后通过依赖注入完成并且在任何其他初始化方法之前容器会自动调用这个方法public void init(){rabbitTemplate.setConfirmCallback(myCallBack);}GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){//指定消息id为1CorrelationData correlationData1 new CorrelationData(1);String routingKey key1;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,messageroutingKey,correlationData1);log.info(发送消息内容:{},messageroutingKey);CorrelationData correlationData2 new CorrelationData(2);routingKey key2;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,messageroutingKey,correlationData2);log.info(发送消息内容:{},messageroutingKey);} }消息消费者 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component Slf4j public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msgnew String(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);} }结果分析 访问http://127.0.0.1:8080/confirm/sendMessage/%E4%BD%A0%E5%A5%BD 可以看到发送了两条消息第一条消息的 RoutingKey 为 key1第二条消息的 RoutingKey 为 key2两条消息都成功被交换机接收也收到了交换机的确认回调但消费者只收到了一条消息因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致也没有其它队列能接收这个消息所有第二条 消息被直接丢弃了  回退消息 Mandatory参数 在仅开启了生产者确认机制的情况下交换还击接收到消息之后会直接给消息生产者发送确认消息如果发现该消息不可路由那么消息会被直接丢弃此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下最起码通知我一声我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者 消息生产者代码 package com.example.controller;import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID;RestController Slf4j RequestMapping(/confirm) public class MessageProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructprivate void init(){rabbitTemplate.setConfirmCallback(this);/*** true交换机无法将消息进行路由时会将消息返回给生产者* false如果发现纤细无法进行路由时直接的丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息给谁处理rabbitTemplate.setReturnsCallback(this);}GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){//让消息绑定一个id值CorrelationData correlationData1 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange,key1,messagekey1,correlationData1);log.info(发送消息 id 为:{}内容为{},correlationData1.getId(),messagekey1);CorrelationData correlationData2 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange,key2,messagekey2,correlationData2);log.info(发送消息 id 为:{}内容为{},correlationData2.getId(),messagekey2);}Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData ! null ? correlationData.getId() : ;if (ack) {log.info(交换机收到消息确认成功, id:{}, id);} else {log.error(消息 id:{}未成功投递到交换机,原因是:{}, id, cause);}}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());} }回调接口 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息,由于原因:{},id,cause);}}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());} }消息消费者 package com.example.component;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component Slf4j public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msgnew String(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);} }结果分析 备份交换机 备份交换机是 RabbitMQ 中的一个机制用于处理无法被路由到队列的消息。它可以看作是交换机的“备胎”当交换机接收到无法路由的消息时会将这些消息转发到备份交换机中由备份交换机来进行处理 通常情况下备份交换机的类型是 Fanout这意味着它会将所有接收到的消息广播给与其绑定的所有队列。因此当备份交换机接收到无法路由的消息时这些消息会被发送到与备份交换机绑定的队列中 通过设置备份交换机我们可以将无法被路由的消息存储到一个特定的队列中然后可以通过监控这个队列来进行报警或者手动处理。这样做既可以避免丢失消息又不会增加生产者的复杂性同时也提高了系统的稳定性和可靠性 代码架构图 修改配置类 package com.example.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;public static final String BACKUP_EXCHANGE_NAME backup.exchange;public static final String BACKUP_QUEUE_NAME backup.queue;public static final String WARNING_QUEUE_NAME warning.queue;//声明确认队列Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue)Queue queue,Qualifier(confirmExchange)DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(key1);}//声明备份交换机Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认交换机并且绑定备份交换机Beanpublic DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备用交换机.withArgument(alternate-exchange, BACKUP_EXCHANGE_NAME);return (DirectExchange) exchangeBuilder.build();}//声明警告队列Beanpublic Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系Beanpublic Binding warningBinding(Qualifier(warningQueue) Queue queue,Qualifier(backupExchange) FanoutExchangebackupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列Bean(backQueue)public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系Beanpublic Binding backupBinding(Qualifier(backQueue) Queue queue,Qualifier(backupExchange) FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);} }报警消费者 package com.example.controller;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.web.bind.annotation.RestController;RestController Slf4j public class WarningConsumer {public static final String WARNING_QUEUE_NAME warning.queue;RabbitListener(queues WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message) {String msg new String(message.getBody());log.error(报警发现不可路由消息{}, msg);} }测试注意事项 直接运行会报错因为我们之前创建过此队列如要修改只能删除或者新建队列 我们可以登录管理后台删除该队列  结果分析
http://www.hkea.cn/news/14571475/

相关文章:

  • 安微省建设厅田网站wordpress首页文章带图
  • 建设行业的门户网站常州建设局下属网站
  • 媒体135网站湖北省建设银行网站6
  • 南山网站建设哪家好做贷款网站犯法
  • 一个公司备案两个网站网址大全2345色综合导航
  • 做网站来钱快微信营销平台
  • 东莞关键词优化代理德州网站优化
  • 如何做棋牌网站有没有做英语题的网站
  • 安徽网站建设外贸丽水专业网站建设价格
  • 国内男女直接做的视频网站wordpress和apache
  • 免费企业信息查询网站wordpress阅读主题
  • 要怎么做网站安顺做网站的公司
  • 房建设计图网站如何为wordpress加评论
  • 做母婴的网站有哪些成都品牌设计
  • 综述题建设网站需要几个步骤做app 的模板下载网站有哪些内容
  • 怎么在企业站建立网站吗建设网站 注册与登陆
  • php网站开发遇到的问题商贸有限公司怎么样注册
  • 做网站设计哪里有信誉好的做pc端网站
  • 怎样做自己的摄影网站成都设计院招聘
  • 佛山市网站建设 乾图信息科技网上如何建网站卖量具
  • html的制作网站的优点代做财务报表分析网站
  • .net网站费用wordpress采集发布
  • windows2008 iis 网站配置合肥网站到首页排名
  • 友情链接网站大全提供网站制作公司电话
  • 搜索引擎大全网站企业网站备案快吗
  • 静态网站提交表单怎么做竞价托管运营哪家好
  • 建筑必看六个网站建筑网官方网站查询
  • 网站建设脚本语言有哪些图片素材网站模板
  • 建手机网站价格网站排名是怎么做
  • 电商网站 建设步骤无限流量网站建设