固镇县住房和城乡建设局网站,wordpress不能注册,西安关键词排名首页,在线制作名片一、前言
本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成#xff0c;比如动态新增 RabbitMQ 交换机、队列等操作。二、默认RabbitMQ中的exchange、queue动态新增及监听
1、新增RabbitMQ配置
RabbitMQConfig.java
import org.springframework.amqp.rabbit.a…一、前言
本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成比如动态新增 RabbitMQ 交换机、队列等操作。二、默认RabbitMQ中的exchange、queue动态新增及监听
1、新增RabbitMQ配置
RabbitMQConfig.java
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** className: RabbitConfig* program: chain* description: RabbitMQ 配置类* author: kenny* create: 2024-10-03 21:59* version: 1.0.0*/
Configuration
EnableRabbit
public class RabbitMQConfig {/*** 创建 RabbitTemplate, 用于发送消息** return RabbitTemplate*/Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate();}/*** 创建 RabbitAdmin, 用于创建 Exchange 和 Queue** param rabbitTemplate RabbitTemplate* return RabbitAdmin*/Beanpublic RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {return new RabbitAdmin(rabbitTemplate);}
}2、新增RabbitMQ动态操作组件
RabbitDynamicConfigService.java
RabbitDynamicConfigService.java 中包含了不同类型Exchange的创建、删除Queue的创建和删除、绑定Exchangeimport lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Map;/*** className: RabbitDynamicConfigService* program: chain* description: 动态创建队列和交换机* author: kenny* create: 2024-10-03 23:49* version: 1.0.0*/
Slf4j
Service
public class RabbitDynamicConfigService {/*** 为了解决循环依赖问题*/private final RabbitAdmin rabbitAdmin;private final RabbitListenerService rabbitListenerService;Autowiredpublic RabbitDynamicConfigService(RabbitAdmin rabbitAdmin,RabbitListenerService rabbitListenerService) {this.rabbitAdmin rabbitAdmin;this.rabbitListenerService rabbitListenerService;}/*** 动态创建队列并持久化** param queueName 队列名称*/public void createQueue(String queueName) {// 队列持久化Queue queue new Queue(queueName, true);// 创建队列rabbitAdmin.declareQueue(queue);System.out.println(队列创建成功: queueName);}/*** 动态创建队列并持久化** param queueName 队列名称*/public void createQueue(String queueName, Boolean isListener) {// 队列持久化Queue queue new Queue(queueName, true);// 创建队列rabbitAdmin.declareQueue(queue);System.out.println(队列创建成功: queueName);if (!isListener) {return;}rabbitListenerService.createListener(queueName);}/*** 动态创建交换机并持久化** param exchangeName 交换机名称*/public void createExchange(String exchangeName) {// 交换机持久化DirectExchange exchange new DirectExchange(exchangeName, true, false);rabbitAdmin.declareExchange(exchange);log.info(交换机创建成功: {}, exchangeName);}// 动态创建 Fanout 交换机public void createDirectExchange(String exchangeName) {DirectExchange fanoutExchange new DirectExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(fanoutExchange);log.info(Direct 交换机创建成功: {}, exchangeName);}// 动态创建 Fanout 交换机public void createFanoutExchange(String exchangeName) {FanoutExchange fanoutExchange new FanoutExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(fanoutExchange);log.info(Fanout 交换机创建成功: {}, exchangeName);}// 动态创建 Topic 交换机public void createTopicExchange(String exchangeName) {TopicExchange topicExchange new TopicExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(topicExchange);log.info(Topic 交换机创建成功: {}, exchangeName);}// 动态创建 Headers 交换机public void createHeadersExchange(String exchangeName) {HeadersExchange headersExchange new HeadersExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(headersExchange);log.info(Headers 交换机创建成功: {}, exchangeName);}/*** 动态绑定队列到交换机并指定路由键** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键*/public void bindQueueToExchange(String queueName, String exchangeName, String routingKey) {Queue queue new Queue(queueName);DirectExchange exchange new DirectExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName 使用路由键: {}, routingKey);}/*** 动态绑定队列到交换机并指定路由键** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键*/public void moreExchangeTypeBindQueueToExchange(String queueName, String exchangeType, String exchangeName, String routingKey, MapString, Object headers) {switch (exchangeType) {case fanout - bindQueueToExchange(queueName, exchangeName, routingKey);case direct - bindQueueToDirectExchange(queueName, exchangeName, routingKey);case topic - bindQueueToTopicExchange(queueName, exchangeName, routingKey);case headers - bindQueueToHeadersExchange(queueName, exchangeName, headers);default - throw new IllegalArgumentException(不支持的交换机类型: exchangeType);}}/*** 动态绑定队列到交换机并指定路由键exchange: direct** param queueName 队列名称* param exchangeName 交换机名称*/public void bindQueueToFanoutExchange(String queueName, String exchangeName) {Queue queue new Queue(queueName);FanoutExchange exchange new FanoutExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName);}/*** 动态绑定队列到交换机并指定路由键exchange: direct** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键*/public void bindQueueToDirectExchange(String queueName, String exchangeName, String routingKey) {Queue queue new Queue(queueName);DirectExchange exchange new DirectExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName 使用路由键: {}, routingKey);}/*** 动态绑定队列到交换机并指定路由键exchange: topic** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键*/public void bindQueueToTopicExchange(String queueName, String exchangeName, String routingKey) {Queue queue new Queue(queueName);TopicExchange exchange new TopicExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info(绑定创建成功: {}, queueName - {}, exchangeName 使用路由键: {}, routingKey);}/*** 动态绑定队列到交换机并指定路由键exchange: headers** param queueName 队列名称* param exchangeName 交换机名称* param headers 路由键*/public void bindQueueToHeadersExchange(String queueName, String exchangeName, MapString, Object headers) {Queue queue new Queue(queueName);HeadersExchange exchange new HeadersExchange(exchangeName);Binding binding BindingBuilder.bind(queue).to(exchange).whereAll(headers).match();rabbitAdmin.declareBinding(binding);log.info(队列 {}, queueName 已绑定到 Headers 交换机 {}, exchangeName 使用头部匹配规则: {}, headers);}/*** 动态删除队列** param queueName 队列名称*/public void deleteQueue(String queueName) {rabbitAdmin.deleteQueue(queueName);log.info(队列删除成功: {}, queueName);}/*** 动态删除交换机** param exchangeName 交换机名称*/public void deleteExchange(String exchangeName) {rabbitAdmin.deleteExchange(exchangeName);log.info(交换机删除成功: {}, exchangeName);}
}3、RabbitMQ中队列的动态监听
RabbitListenerService.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** className: RabbitListenerService* program: chain* description: RabbitMQ监听器Service组件* author: kenny* create: 2024-10-04 01:40* version: 1.0.0*/
Slf4j
Service
public class RabbitListenerService {// 为了解决循环依赖问题private final SimpleRabbitListenerContainerFactory listenerContainerFactory;private final ConnectionFactory connectionFactory;Autowiredpublic RabbitListenerService(SimpleRabbitListenerContainerFactory listenerContainerFactory,ConnectionFactory connectionFactory) {this.listenerContainerFactory listenerContainerFactory;this.connectionFactory connectionFactory;}/*** 创建监听器容器并启动监听** param queueName 队列名称*/public void createListener(String queueName) {// 创建并启动监听器容器SimpleMessageListenerContainer container listenerContainerFactory.createListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);// 监听逻辑处理container.setMessageListener(new MessageListenerAdapter(new Object() {public void handleMessage(String message) {System.out.println(收到来自RabbitMQ中队列 queueName 队列的消息 message);}}));// 启动监听器容器container.start();System.out.println(RabbitMQ队列监听器已启动 queueName);}
}4、RabbitMQ中的Exchange、Queue动态操作接口
RabbitDynamicChannelController.java
import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;/*** className: RabbitDynamicController* program: chain* description: RabbitMQ 动态创建队列、交换机绑定等操作* author: kenny* create: 2024-10-04 00:22* version: 1.0.0*/
RestController
RequestMapping(/rabbit/dynamic/channel)
public class RabbitDynamicChannelController {/*** 动态创建队列和交换机*/Resourceprivate RabbitDynamicConfigService rabbitDynamicConfigService;/*** 动态创建队列** param queueName 队列名称* return 处理结果*/GetMapping(/createQueue)public String createQueue(RequestParam(queueName) String queueName) {rabbitDynamicConfigService.createQueue(queueName);return 队列已创建: queueName;}/*** 动态创建交换机** param exchangeName 交换机名称* return 处理结果*/GetMapping(/createExchange)public String createExchange(RequestParam(exchangeName) String exchangeName) {rabbitDynamicConfigService.createExchange(exchangeName);return 交换机已创建: exchangeName;}/*** 动态绑定队列和交换机** param queueName 队列名称* param exchangeName 交换机名称* param routingKey 路由键* return 处理结果*/GetMapping(/bindQueue)public String bindQueueToExchange(RequestParam(queueName) String queueName,RequestParam(exchangeName) String exchangeName,RequestParam(routingKey) String routingKey) {rabbitDynamicConfigService.bindQueueToExchange(queueName, exchangeName, routingKey);return 队列和交换机已绑定: queueName - exchangeName;}/*** 动态删除队列** param queueName 队列名称* return 处理结果*/GetMapping(/deleteQueue)public String deleteQueue(RequestParam(queueName) String queueName) {rabbitDynamicConfigService.deleteQueue(queueName);return 队列已删除: queueName;}/*** 动态删除交换机** param exchangeName 交换机名称* return 处理结果*/GetMapping(/deleteExchange)public String deleteExchange(RequestParam(exchangeName) String exchangeName) {rabbitDynamicConfigService.deleteExchange(exchangeName);return 交换机已删除: exchangeName;}// 创建并绑定 Fanout 交换机GetMapping(/createDirectExchange)public String createDirectExchange(RequestParam String exchangeName, RequestParam String queueName, RequestParam String routingKey) {rabbitDynamicConfigService.createDirectExchange(exchangeName);rabbitDynamicConfigService.bindQueueToDirectExchange(queueName, exchangeName, routingKey);return Fanout Exchange and Queue Binding created: exchangeName - queueName with routing key: routingKey;}// 创建并绑定 Fanout 交换机GetMapping(/createFanoutExchange)public String createFanoutExchange(RequestParam String exchangeName, RequestParam String queueName) {rabbitDynamicConfigService.createFanoutExchange(exchangeName);rabbitDynamicConfigService.bindQueueToFanoutExchange(queueName, exchangeName);return Fanout Exchange and Queue Binding created: exchangeName - queueName;}// 创建并绑定 Topic 交换机GetMapping(/createTopicExchange)public String createTopicExchange(RequestParam String exchangeName, RequestParam String queueName, RequestParam String routingKey) {rabbitDynamicConfigService.createTopicExchange(exchangeName);rabbitDynamicConfigService.bindQueueToTopicExchange(queueName, exchangeName, routingKey);return Topic Exchange and Queue Binding created: exchangeName - queueName with routing key: routingKey;}// 创建并绑定 Headers 交换机GetMapping(/createHeadersExchange)public String createHeadersExchange(RequestParam String exchangeName, RequestParam String queueName, RequestParam MapString, String headersMap) {MapString, Object headers new HashMap(headersMap);rabbitDynamicConfigService.createHeadersExchange(exchangeName);rabbitDynamicConfigService.bindQueueToHeadersExchange(queueName, exchangeName, headers);return Headers Exchange and Queue Binding created: exchangeName - queueName with headers: headers;}
}5、RabbitMQ中的Queue消息监听动态操作接口
RabbitChannelListenerController.java
import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** className: RabbitListenerController* program: chain* description: RabbitMQ 监听器 Controller 组件* author: kenny* create: 2024-10-04 01:30* version: 1.0.0*/
RestController
RequestMapping(/rabbit/channel/listener)
public class RabbitChannelListenerController {Resourceprivate RabbitDynamicConfigService rabbitDynamicConfigService;/*** 创建监听器监听指定队列** param queueName 队列名称* return 处理结果*/GetMapping(/queue)public String listenQueue(RequestParam(queueName) String queueName) {rabbitDynamicConfigService.createQueue(queueName, true);return 开始监听队列 queueName;}
}三、动态exchange、queue的测试
1、测试Exchange、Queue的动态创建和删除
2、测试Exchange和Queue的动态绑定
3、发送、接收消息测试动态创建Exchange、Queue
4、测试Queue的动态监听接口
下一篇7、Spring Boot 3.x集成RabbitMQ动态实例等操作