微信视频网站怎么做的好处,盘龙区网络推广,洛阳网站建设培训,备案之后怎样把 放到网站上目录
一、引言
二、RepublishMessageRecoverer 实现
2.1. 实现步骤
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
2.2.2. 常规交换机队列配置类
2.2.3. 消费者代码
2.2.4. 消费者yml配置
2.2.5. 生产者代码
2.2.6. 生产者yml配置 2.2.7. 运行效果 一、引言 …目录
一、引言
二、RepublishMessageRecoverer 实现
2.1. 实现步骤
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
2.2.2. 常规交换机队列配置类
2.2.3. 消费者代码
2.2.4. 消费者yml配置
2.2.5. 生产者代码
2.2.6. 生产者yml配置 2.2.7. 运行效果 一、引言
Spring AMQP提供了消费者失败重试机制在消费者出现异常时利用本地重试而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none关闭ackmanual手动ackauto自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
在开启重试模式后重试次数耗尽如果消息依然失败则需要有MessageRecoverer接口来处理它包含三种不同的实现 RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息默认方式 ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队 RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机推荐 二、RepublishMessageRecoverer 实现
在实际项目的生产环境中通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。 2.1. 实现步骤
1. 将失败处理策略改为RepublishMessageRecoverer
2. 定义接收失败消息的交换机、队列及其绑定关系
3. 定义RepublishMessageRecoverer
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
Configuration
ConditionalOnProperty(prefix spring.rabbitmq.listener.simple.retry, name enabled, havingValue true)
public class ErrorConfig {Resourceprivate RabbitTemplate rabbitTemplate;BeanQueue errorQueue() {return new Queue(error.queue);}BeanDirectExchange errorExchange() {return new DirectExchange(error.direct);}BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with(error);}Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}2.2.2. 常规交换机队列配置类
package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
Configuration
public class RabbitMQConfig {BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable(simple.queue).build();}
}2.2.3. 消费者代码
package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
Slf4j
Component
public class SimpleListener {RabbitListener(queues simple.queue)public void listener1(String msg) throws Exception {
// System.out.println(消费者1人生是个不断攀登的过程【 msg 】);throw new Exception();}
}2.2.4. 消费者yml配置
# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none关闭ackmanual手动ackauto自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
2.2.5. 生产者代码
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
Slf4j
SpringBootTest
class PublisherApplicationTests {Resourceprivate RabbitTemplate rabbitTemplate;Testvoid test() {rabbitTemplate.convertAndSend(simple.queue, 只要学不死就往死里学);}
}2.2.6. 生产者yml配置
# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou 2.2.7. 运行效果
最终效果是我们在消费者的代码逻辑中会抛出异常消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中