班级网站建设活动方案,海东高端网站建设,做网站卖装备,版面布局网站的域名和所采用的版面布局形式RabbitMQ 如何使用延迟队列
目录
前置条件场景描述RabbitMQ 延迟队列机制实现步骤 1. 安装 RabbitMQ 延迟队列插件2. 创建延迟队列和交换机3. 发布延迟消息4. 消费延迟消息 示例代码 1. 延迟队列配置2. 发布消息的 Producer 代码3. 消费消息的 Consumer 代码 注意事项
前置条…RabbitMQ 如何使用延迟队列
目录
前置条件场景描述RabbitMQ 延迟队列机制实现步骤 1. 安装 RabbitMQ 延迟队列插件2. 创建延迟队列和交换机3. 发布延迟消息4. 消费延迟消息 示例代码 1. 延迟队列配置2. 发布消息的 Producer 代码3. 消费消息的 Consumer 代码 注意事项
前置条件
操作系统CentOS 7RabbitMQ版本 3.8.0Erlang版本 21.0RabbitMQ 延迟队列插件rabbitmq_delayed_message_exchange
场景描述
假设我们正在设计一个线上售卖电影票的系统用户购票后有 15 分钟时间进行付款如果用户在 15 分钟内未付款订单将自动取消并释放电影票库存。这里我们可以利用 RabbitMQ 的延迟队列机制在用户购票时发送一条延迟消息到 RabbitMQ并设定延迟时间为 15 分钟。如果用户未在 15 分钟内完成付款延迟消息将被消费者接收并处理订单取消的逻辑。
RabbitMQ 延迟队列机制
RabbitMQ 本身不直接支持延迟队列功能需要借助 rabbitmq_delayed_message_exchange 插件来实现。该插件为 RabbitMQ 提供了一种新的消息交换机类型——x-delayed-message可以基于消息属性设置延迟时间在设定的延迟时间后将消息发送到目标队列。
实现步骤
1. 安装 RabbitMQ 延迟队列插件
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez# 将插件移动到 RabbitMQ 插件目录
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins/# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange2. 创建延迟队列和交换机
我们将使用 x-delayed-message 类型的交换机并设定延迟队列用于处理延迟消息。
# 创建交换机
rabbitmqadmin declare exchange namedelayed_exchange typex-delayed-message \arguments{x-delayed-type:direct}# 创建队列
rabbitmqadmin declare queue namedelayed_queue# 绑定交换机与队列
rabbitmqadmin declare binding sourcedelayed_exchange destinationdelayed_queue routing_keyorder.payment3. 发布延迟消息
在发布消息时可以设置消息属性 x-delay 来指定延迟时间。
# 使用 rabbitmqadmin 发布延迟消息
rabbitmqadmin publish exchangedelayed_exchange routing_keyorder.payment \payload{order_id: 12345, status: PENDING_PAYMENT} \properties{headers:{x-delay:900000}}4. 消费延迟消息
消费者将从延迟队列中消费消息并执行订单取消逻辑。
示例代码
1. 延迟队列配置
在 Spring Boot 项目中可以通过以下配置来创建延迟交换机和队列。
Configuration
public class RabbitConfig {public static final String DELAYED_EXCHANGE_NAME delayed_exchange;public static final String DELAYED_QUEUE_NAME delayed_queue;public static final String ROUTING_KEY order.payment;// 创建延迟交换机Beanpublic CustomExchange delayedExchange() {MapString, Object args new HashMap();args.put(x-delayed-type, direct);return new CustomExchange(DELAYED_EXCHANGE_NAME, x-delayed-message, true, false, args);}// 创建延迟队列Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 绑定延迟队列与交换机Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();}
}2. 发布消息的 Producer 代码
Component
public class OrderProducer {Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrderMessage(String orderId) {MapString, Object message new HashMap();message.put(order_id, orderId);message.put(status, PENDING_PAYMENT);MessageProperties messageProperties new MessageProperties();messageProperties.setHeader(x-delay, 15 * 60 * 1000); // 延迟 15 分钟Message msg new Message(new ObjectMapper().writeValueAsBytes(message), messageProperties);rabbitTemplate.convertAndSend(RabbitConfig.DELAYED_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);}
}3. 消费消息的 Consumer 代码
Component
public class OrderConsumer {RabbitListener(queues RabbitConfig.DELAYED_QUEUE_NAME)public void processOrderCancellation(Message message) {try {MapString, Object orderMessage new ObjectMapper().readValue(message.getBody(), Map.class);String orderId (String) orderMessage.get(order_id);// 取消订单逻辑System.out.println(Order orderId has been canceled due to non-payment.);} catch (Exception e) {e.printStackTrace();}}
}注意事项
插件兼容性请确保 rabbitmq_delayed_message_exchange 插件与您的 RabbitMQ 版本兼容否则可能导致插件无法加载。Erlang 版本RabbitMQ 依赖于 Erlang因此确保您的 Erlang 版本满足 RabbitMQ 版本的最低要求。延迟时间限制合理设置延迟时间避免消息被延迟过长时间导致系统不可预测的性能问题。