企业网站开发信息,安徽省住房建设厅网站,windows怎么做网站,互联网保险的发展现状我们知道#xff0c;RabbitMQ的消息最终是存储在Queue上的#xff0c;而在Queue之前还要经过Exchange#xff0c;那么这个过程中就有两个地方可能导致消息丢失。第一个是Producer到Exchange的过程#xff0c;第二个是Exchange到Queue的过程。 为了解决这个问题#xff0c…我们知道RabbitMQ的消息最终是存储在Queue上的而在Queue之前还要经过Exchange那么这个过程中就有两个地方可能导致消息丢失。第一个是Producer到Exchange的过程第二个是Exchange到Queue的过程。 为了解决这个问题有两种方案一种是通过confirm机制另外一种是事务机制因为事务机制并不推荐这里先介绍Confirm机制。
Publisher Confirm是一种机制用于确保消息已经被Exchange成功接收和处理。一旦消息成功到达Exchange并被处理RabbitMQ会向消息生产者发送确认信号ACK。如果由于某种原因例如Exchange不存在或路由键不匹配消息无法被处理RabbitMQ会向消息生产者发送否定信号NACK。
//启用Publisher Confirmschannel.confirmSelect();//设置Publisher Confirms回调channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(Message confirmed with deliveryTag:deliveryTag);//在这里处理消息确认}Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(Message not confirmed with deliveryTag:deliveryTag);//在这里处理消息未确认}});Publisher Returns机制与Publisher Confirms类似但用于处理在消息无法路由到任何队列时的情况。当RabbitMQ在无法路由消息时将消息返回给消息生产者但是如果能正常路由则不会返回消息。
//启用Publisher Returnschannel.addReturnListener(new ReturnListener() {Overridepublic void handleReturn(int replyCode, String replyTest, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(Message returned with replayCode: replyCode);//在这里处理消息发送到Queue失败的返回}});通过以上方式我们注册了两个回调监听用于在消息发送到Exchange或者Queue失败时进行异常处理。通常我们可以在失败时精心报警或者重试来保障一定能发送成功。
完整代码
package com.example.demo.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class PublisherCallbacksExample {public static void main(String[] args) throws Exception{ConnectionFactory factorynew ConnectionFactory();factory.setHost(localhost);try(Connection connectionfactory.newConnection();Channel channelconnection.createChannel()){//启用Publisher Confirmschannel.confirmSelect();//设置Publisher Confirms回调channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(Message confirmed with deliveryTag:deliveryTag);//在这里处理消息确认}Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(Message not confirmed with deliveryTag:deliveryTag);//在这里处理消息未确认}});//启用Publisher Returnschannel.addReturnListener(new ReturnListener() {Overridepublic void handleReturn(int replyCode, String replyTest, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(Message returned with replayCode: replyCode);//在这里处理消息发送到Queue失败的返回}});String exchangeName my_exchange;String routingKey my_routing_key;String message Hello,RabbitMQ!;//发布消息到Exchangechannel.basicPublish(exchangeName,routingKey,true,null,message.getBytes());//等待Publisher Confirmsif (!channel.waitForConfirms()) {System.out.println(Message was not confirmed!);}//关闭通道和连接channel.close();}}
}
另外这里如果发送到Queue之后是否一定能持久化下来是否一定不丢这就是另外一个话题了。