惠州网站建设学校,建设部网站燃气管理部门,网站上传不了照片,中国十大it培训机构排名在生产环境中由于一些不明原因#xff0c;导致 rabbitmq 重启#xff0c;在 RabbitMQ 重启期间生产者消息投递失败#xff0c; 导致消息丢失#xff0c;需要手动处理和恢复。于是#xff0c;我们开始思考#xff0c;如何才能进行 RabbitMQ 的消息可靠投递呢#xff1f; …在生产环境中由于一些不明原因导致 rabbitmq 重启在 RabbitMQ 重启期间生产者消息投递失败 导致消息丢失需要手动处理和恢复。于是我们开始思考如何才能进行 RabbitMQ 的消息可靠投递呢 特别是在这样比较极端的情况RabbitMQ 集群不可用的时候无法投递的消息该如何处理呢
发布确认SpringBoot版本
确认机制方案 代码架构图 配置文件
spring.rabbitmq.host118.31.6.132
spring.rabbitmq.port5672
spring.rabbitmq.usernameadmin
spring.rabbitmq.password123
spring.rabbitmq.publisher-confirm-typecorrelated
在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-typecorrelated
NONE 禁用发布确认模式是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 经测试有两种效果其一效果和 CORRELATED 值一样会触发回调方法其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel则接下来无法发送消息到 broker
引入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.example/groupIdartifactIdspringboot-rabbitmq/artifactIdversion0.0.1-SNAPSHOT/versionnamedemo/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version17/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-autoconfigure/artifactIdversion3.2.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project添加配置类
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;//声明确认队列Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认交换机Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue)Queue queue,Qualifier(confirmExchange)DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(key1);}
}回调接口
package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;Component
Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息,由于原因:{},id,cause);}}
}消息生产者
package com.example.controller;import com.example.component.MyCallBack;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;RestController
RequestMapping(/confirm)
Slf4j
public class producer {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;Autowiredprivate RabbitTemplate rabbitTemplate;Autowiredprivate MyCallBack myCallBack;PostConstruct //在类实例被创建后通过依赖注入完成并且在任何其他初始化方法之前容器会自动调用这个方法public void init(){rabbitTemplate.setConfirmCallback(myCallBack);}GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){//指定消息id为1CorrelationData correlationData1 new CorrelationData(1);String routingKey key1;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,messageroutingKey,correlationData1);log.info(发送消息内容:{},messageroutingKey);CorrelationData correlationData2 new CorrelationData(2);routingKey key2;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,messageroutingKey,correlationData2);log.info(发送消息内容:{},messageroutingKey);}
}消息消费者
package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msgnew String(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);}
}结果分析
访问http://127.0.0.1:8080/confirm/sendMessage/%E4%BD%A0%E5%A5%BD 可以看到发送了两条消息第一条消息的 RoutingKey 为 key1第二条消息的 RoutingKey 为 key2两条消息都成功被交换机接收也收到了交换机的确认回调但消费者只收到了一条消息因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致也没有其它队列能接收这个消息所有第二条 消息被直接丢弃了
回退消息
Mandatory参数
在仅开启了生产者确认机制的情况下交换还击接收到消息之后会直接给消息生产者发送确认消息如果发现该消息不可路由那么消息会被直接丢弃此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下最起码通知我一声我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者
消息生产者代码
package com.example.controller;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;RestController
Slf4j
RequestMapping(/confirm)
public class MessageProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructprivate void init(){rabbitTemplate.setConfirmCallback(this);/*** true交换机无法将消息进行路由时会将消息返回给生产者* false如果发现纤细无法进行路由时直接的丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息给谁处理rabbitTemplate.setReturnsCallback(this);}GetMapping(/sendMessage/{message})public void sendMessage(PathVariable String message){//让消息绑定一个id值CorrelationData correlationData1 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange,key1,messagekey1,correlationData1);log.info(发送消息 id 为:{}内容为{},correlationData1.getId(),messagekey1);CorrelationData correlationData2 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange,key2,messagekey2,correlationData2);log.info(发送消息 id 为:{}内容为{},correlationData2.getId(),messagekey2);}Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData ! null ? correlationData.getId() : ;if (ack) {log.info(交换机收到消息确认成功, id:{}, id);} else {log.error(消息 id:{}未成功投递到交换机,原因是:{}, id, cause);}}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}回调接口
package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;Component
Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id correlationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息,由于原因:{},id,cause);}}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}消息消费者
package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msgnew String(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);}
}结果分析 备份交换机
备份交换机是 RabbitMQ 中的一个机制用于处理无法被路由到队列的消息。它可以看作是交换机的“备胎”当交换机接收到无法路由的消息时会将这些消息转发到备份交换机中由备份交换机来进行处理
通常情况下备份交换机的类型是 Fanout这意味着它会将所有接收到的消息广播给与其绑定的所有队列。因此当备份交换机接收到无法路由的消息时这些消息会被发送到与备份交换机绑定的队列中
通过设置备份交换机我们可以将无法被路由的消息存储到一个特定的队列中然后可以通过监控这个队列来进行报警或者手动处理。这样做既可以避免丢失消息又不会增加生产者的复杂性同时也提高了系统的稳定性和可靠性
代码架构图 修改配置类
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;public static final String BACKUP_EXCHANGE_NAME backup.exchange;public static final String BACKUP_QUEUE_NAME backup.queue;public static final String WARNING_QUEUE_NAME warning.queue;//声明确认队列Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue)Queue queue,Qualifier(confirmExchange)DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(key1);}//声明备份交换机Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认交换机并且绑定备份交换机Beanpublic DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备用交换机.withArgument(alternate-exchange, BACKUP_EXCHANGE_NAME);return (DirectExchange) exchangeBuilder.build();}//声明警告队列Beanpublic Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系Beanpublic Binding warningBinding(Qualifier(warningQueue) Queue queue,Qualifier(backupExchange) FanoutExchangebackupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列Bean(backQueue)public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系Beanpublic Binding backupBinding(Qualifier(backQueue) Queue queue,Qualifier(backupExchange) FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}报警消费者
package com.example.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RestController;RestController
Slf4j
public class WarningConsumer {public static final String WARNING_QUEUE_NAME warning.queue;RabbitListener(queues WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message) {String msg new String(message.getBody());log.error(报警发现不可路由消息{}, msg);}
}测试注意事项
直接运行会报错因为我们之前创建过此队列如要修改只能删除或者新建队列 我们可以登录管理后台删除该队列 结果分析