做网站 服务器价格,如何设计微商城网站建设,做曖免费网站,快捷的赣州网站建设目录 一、消息确认
1.消息确认机制
2.手动确认方法
二、代码示例
1. AcknowledgeMode.NONE
1.1 配置文件
1.2 生产者 1.3 消费者
1.4 运行程序 2.AcknowledgeMode.AUTO
3.AcknowledgeMode.MANUAL 一、消息确认
1.消息确认机制
生产者发送消息之后#xff0c;到达消…目录 一、消息确认
1.消息确认机制
2.手动确认方法
二、代码示例
1. AcknowledgeMode.NONE
1.1 配置文件
1.2 生产者 1.3 消费者
1.4 运行程序 2.AcknowledgeMode.AUTO
3.AcknowledgeMode.MANUAL 一、消息确认
1.消息确认机制
生产者发送消息之后到达消费端之后可能会有以下情况
1. 消息处理成功
2. 消息处理异常。 RabbitMQ向消费者发送消息后就会把这条消息删除掉那么第二种情况就会造成消息丢失。
那么如何确保消息端已经被成功接收了并且被正确处理了呢
为了确保消息从队列可靠的到达消费者RabbitMQ提供了消息确认机制Messageacknowledment。
消费者在订阅队列时可以指定autoAck参数根据这个参数消息确认机制分为以下两种
自动确认当autoAck等于true时RabbitMQ会自动把发送出去的消息置为确认然后从内存或者磁盘中删除而不管消费者是否真正的接收到消息自动确认模式适用于对于消息可靠性要求不高的场景。
手动确认当autoAck等于false时RabbitMQ会等待消费者显式的调用BasicAck命令回复确认信号后才从内存或者磁盘中删除这种方式适用于对消息可靠性要求较高的场景。
自动确认代码示例
DefaultConsumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接收到消息: new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
当autoAck参数置为false对于RabbitMQ服务器来说队列中的消息分为了两个部分
一是等待发送给消费者的消息二是已经发送给消费者但是还没收到消费者确认信号的消息。
如果RabbitMQ一直没有收到消费者的确认信号并且消费此消息的消费者已经断开连接则RabbitMQ会重新安排这条消息进入队列等待投递给下一个消费者当然也有可能是原来的那个消费者。 从RabbitMQ的Web管理平台上也可以看到当前队列中Ready状态和Unacked状态的消息数。 Ready等待投递给消费者的消息数。
Unacked已经投递给消费者但是未收到消费者确认信号的消息数。
2.手动确认方法
消费者在收到消息后可以选择确认也可以选择跳过或者直接拒绝确认RabbitMQ也提供了不同的确认方法消费者客户端可以调用与其对应的channel的相关方法共有以下三种
肯定确认: Channel.basicAck(long deliveryTag, boolean multiple);
RabbitMQ 已经知道该消息并且成功的处理消息可以将其丢弃。
参数说明
deliveryTag消息的唯一标识它是一个单调递增的64位的长整型值deliveryTag是每个信道Channel独立维护的所以在每个信道上都是唯一的当消费者确认ack一条消息时必须使用对应的信道进行确认。
multiple是否批量确认在某些情况下为了减少网络流量可以对一系列连续的deliveryTag进行批量确认值为true则会一次性ack所以小于等于指定deliveryTag的消息值为false则只确认当前deliveryTag的消息。 deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分, 它确保了消息传递的可靠性和顺 序性。 否定确认: Channel.basicReject(long deliveryTag, boolean requeue); 参数说明 deliveryTag参考上文。 requeue表示拒绝后这条消息该如何处理如果值为true那么则RabbitMQ会将这条消息重新入队重新发送给下一个订阅的消费者值为false则RabbitMQ会把这条消息从队列中移除不会再发送给消费者。 否定确认: Channel.basicNack(long deliveryTag, boolean multiple,
boolean requeue); 参数说明 参考上文 multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息。 二、代码示例 我们基于SpringBoot来演示消息的确认机制使用方式和方法与RabbitMQ Java Client有一定差异 Spring AMQP对消息确认提供了三种策略 public enum AcknowledgeMode {NONE,MANUAL,AUTO;
} . AcknowledgeMode.NONE 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认 消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失. AcknowledgeMode.AUTO(默认) 这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确 认消息. AcknowledgeMode.MANUAL ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消 息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这 种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被 重新处理. 1. AcknowledgeMode.NONE
1.1 配置文件
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: none1.2 生产者
public class Constant {public static final String ACK_EXCHANGE_NAME ack_exchange;public static final String ACK_QUEUE ack_queue;
}
/*
以下为消费端⼿动应答代码⽰例配置
*/
Bean(ackExchange)
public Exchange ackExchange(){return
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
Bean(ackQueue)
public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
Bean(ackBinding)
public Binding ackBinding(Qualifier(ackExchange) Exchange exchange,
Qualifier(ackQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ack).noargs();
}
import com.xiaowu.rabbitmq.constant.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
RestController
RequestMapping(/producer)
public class ProductController {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/ack)public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, ack,
consumer ack test...);return 发送成功!;}
} 1.3 消费者 import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
Component
public class AckQueueListener {//指定监听队列的名称RabbitListener(queues Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {System.out.printf(接收到消息: %s, deliveryTag: %d%n, new
String(message.getBody(),UTF-8),
message.getMessageProperties().getDeliveryTag());//模拟处理失败int num 3/0;System.out.println(处理完成);}
}
1.4 运行程序
启动生产者可以从RabbitMQ Web管理界面看到如下 再启动消费者控制台输出
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:03:57.79708:00 WARN 16952 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception
threw exception
//....
管理界面 可以看到消息处理失败但是消息已经从管理界面移除。 2.AcknowledgeMode.AUTO
将配置文件修改为
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: auto
再次启动程序控制台不断输出错误信息
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.11408:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.16108:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.20808:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception
threw exception接收到消息: consumer ack test..., deliveryTag: 4
2024-04-29T17:07:09.25408:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception
threw exception 从⽇志上可以看出, 当消费者出现异常时, RabbitMQ会不断的重发. 由于异常多次重试还是失败消 息没被确认也无法nack就⼀直是unacked状态导致消息积压。 3.AcknowledgeMode.MANUAL
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: manual
消费者手动确认逻辑
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
Component
public class AckQueueListener {//指定监听队列的名称RabbitListener(queues Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf(接收到消息: %s, deliveryTag: %d%n, new
String(message.getBody(),UTF-8),
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println(处理业务逻辑);//⼿动设置⼀个异常, 来测试异常拒绝机制
// int num 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0。
异常时拒绝
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
Component
public class AckQueueListener {//指定监听队列的名称RabbitListener(queues Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf(接收到消息: %s, deliveryTag: %d%n, new
String(message.getBody(),UTF-8),
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println(处理业务逻辑);//⼿动设置⼀个异常, 来测试异常拒绝机制int num 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}运⾏结果: 消费异常时不断重试, deliveryTag 从1递增 控制台日志: 接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑管理页面上unacked也是1