织梦视频资讯网站源码,优秀企业网站首页,做家教去哪个网站,qian p.wordpress/1. 消费者确认机制
没有ack#xff0c;mq就会一直保留消息。
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack2. 失败重试机制
当消费者出现异常后#xff0c;消息会不断requeue#xff08;重入队#xff09;到队列#xff0c;再重新发送给消费者。…
1. 消费者确认机制
没有ackmq就会一直保留消息。
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack2. 失败重试机制
当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者。如果消费者再次执行依然出错消息会再次requeue到队列再次投递直到消息处理成功为止。 极端情况就是消费者一直无法执行成功那么消息requeue就会无限循环导致mq的消息处理飙升带来不必要的压力。
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false重启consumer服务重复之前的测试。可以发现
消费者在失败后消息没有重新回到MQ无限重新投递而是在本地重试了3次本地重试3次以后抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台发现消息被删除了说明最后SpringAMQP返回的是reject
结论
开启本地重试时消息处理过程中抛出异常不会requeue到队列而是在消费者本地重试重试达到最大次数后Spring会返回reject消息会被丢弃
3. 失败处理策略
在之前的测试中本地测试达到最大重试次数后消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下显然不太合适了。 因此Spring允许我们自定义重试次数耗尽后的消息处理策略这个策略是由MessageRecovery接口来定义的它有3个不同实现
RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。 代码
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
ConditionalOnProperty(name spring.rabbitmq.listener.simple.retry.enabled, havingValue true)
public class ErrorMessageConfiguration {
// RabbitListener(bindings QueueBinding(
// value Queue(name error.queue),
// exchange Exchange(name error.direct, type ExchangeTypes.DIRECT),
// key {error}
// ))
// public void bings(Object msg){
// System.out.println(异常msg.toString());
// }Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}