网站产品设计规范 模板,公益网站建设那家好,网站群建设公司排行榜6,浏阳网页设计#xff08;一#xff09;延迟队列
1.概念 延迟队列是一种特殊的队列#xff0c;消息被发送后#xff0c;消费者并不会立刻拿到消息#xff0c;而是等待一段时间后#xff0c;消费者才可以从这个队列中拿到消息进行消费
2.应用场景 延迟队列的应用场景很多#xff0c;…一延迟队列
1.概念 延迟队列是一种特殊的队列消息被发送后消费者并不会立刻拿到消息而是等待一段时间后消费者才可以从这个队列中拿到消息进行消费
2.应用场景 延迟队列的应用场景很多就比如大部分定时的场景我们都可以利用延迟队列例如闹钟定时预约会议空调定时开关等 但是RabbitMQ是没有直接给我们提供延迟队列的但是我们可以通过上一篇博客说的ttl和死信来达到延迟队列的效果具体操作如下 首先我们有一个交换机和一个队列然后此队列又指定一个死信交换机死信交换机绑定一个死信队列然后我们消费者并不是从正常队列中获取消息而是从死信队列中获取消息通过给消息/队列设置过期时间来影响消息到达死信队列的时间消费者拿到消息就会延迟这样就可以模拟出延迟的效果。 那接下来就是我们的代码实现
首先我们通过设置队列ttl来实现 Bean(ttlExchange)public Exchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}Bean(ttlQueue)public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000).deadLetterExchange(Constants.DEAD_EXCHANGE).deadLetterRoutingKey(dead).build();}Bean(ttlBind)public Binding ttlBind(Qualifier(ttlExchange) Exchange ackExchange,Qualifier(ttlQueue) Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with(ttl).noargs();}Bean(deadExchange)public Exchange deadExchange(){return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}Bean(deadQueue)public Queue deadQueue(){return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}Bean(deadBind)public Binding deadBind(Qualifier(deadExchange) Exchange ackExchange,Qualifier(deadQueue) Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with(dead).noargs();}
然后生产者代码没什么变化 RequestMapping(ttl)public String TTLPro(){String s1ttl test;Message messagenew Message(s1.getBytes(StandardCharsets.UTF_8));
// message.getMessageProperties().setExpiration(10000);RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,ttl,message);return 发送成功;}
只不过消费者订阅的是死信队列
RabbitListener(queues Constants.DEAD_QUEUE)public void ListenerQueue2(Message message,Channel channel) throws IOException {long Tagmessage.getMessageProperties().getDeliveryTag();try {System.out.println(接收到消息: new String(message.getBody()) TagID: Tag);int num3/0; //模拟失败channel.basicAck(Tag,false);System.out.println(处理完成);}catch (Exception e){channel.basicReject(Tag,false);}}
这样过了5s后我们就可以从死信队列中获取到延迟消息了 那我们再来通过设置消息的ttl来看一下
首先我们要把队列的ttl给取消掉记得要删队列 Bean(ttlExchange)public Exchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}Bean(ttlQueue)public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE).deadLetterRoutingKey(dead).build();}Bean(ttlBind)public Binding ttlBind(Qualifier(ttlExchange) Exchange ackExchange,Qualifier(ttlQueue) Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with(ttl).noargs();}Bean(deadExchange)public Exchange deadExchange(){return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}Bean(deadQueue)public Queue deadQueue(){return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}Bean(deadBind)public Binding deadBind(Qualifier(deadExchange) Exchange ackExchange,Qualifier(deadQueue) Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with(dead).noargs();} 然后我们发送一个ttl时间为10s的再发送一个5s的我们知道这样两条数据是会发生错误的因为我们设置消息过期时间我们RabbitMQ性能问题并不会遍历整个消息队列看看谁过没过期如果过期的消息不在队头那么只有当使用的时候才会真正的进行一些过期处理比如传给死信交换机
RequestMapping(ttl)public String TTLPro(){String s1ttl test;Message messagenew Message(s1.getBytes(StandardCharsets.UTF_8));message.getMessageProperties().setExpiration(10000);RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,ttl,message);message.getMessageProperties().setExpiration(5000);RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,ttl,message);return 发送成功;}
如果正常会在5s后接收到第一个消息在10s后接收到第二个消息但是此时我们会同一时间10s接收到两条消息 那这个问题在上一篇ttl的时候就说过了这里依然是个问题虽然设置队列的ttl不会有这个问题但是设置队列ttl我们针对不同延迟时间就需要创建多个队列这是不太合理的所以针对这个问题我们有一个延迟队列的插件可以使用 3.延迟队列插件
延迟队列插件会给我们提供一个特殊的交换机来完成我们的延迟功能
这是我们插件的下载地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 我们需要找到ez文件并下载但是注意这里的版本要与你的RabbitMQ版本可以匹配否则之后会出现问题 那插件下完后我们要找到对应目录下载插件 上面两个目录我们可以任选一个下载即可没有这个目录我们手动创建
然后把下载的ez文件copy到这个目录中即可然后我们可以使用命令 rabbitmq-plugins list 来查看插件列表看看我们有没有成功放进去但是注意即使我们成功放进去并成功显示了也可能会出错这就可能是你们下载的RabbitMQ版本与整个延迟插件的版本不匹配重新下载其他版本即可
然后我们启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange
之后重启服务service rabbitmq-server restart
在没有发生错误的情况下我们就发现我们会多了一个默认的交换机 此时我们代码中就不需要声明普通交换机了而是直接使用默认交换机即可
我们生产者代码是需要改一下的我们需要调用一个方法来设置延迟时间 RequestMapping(/delay2)
public String delay2() {//发送带ttl的消息 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, delayed,
delayed test 20s...new Date(), messagePostProcessor - {messagePostProcessor.getMessageProperties().setDelayLong(20000L); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, delayed,
delayed test 10s...new Date(), messagePostProcessor - {messagePostProcessor.getMessageProperties().setDelayLong(10000L); //设置延迟时间 return messagePostProcessor;});return 发送成功!;
}此时我们就可以在10s正确接收一个消息在20s正确接收另一个消息 注意我们使用TTL死信时消息传递给交换机后映射之后一直在正常队列中的等待TTL时间到了把消息给死信交换机再映射到死信队列再拿到消息我们使用插件的时候消息是在RabbitMQ给我们提供的那个特殊的交换机中的等待时间到了再映射给队列然后从队列中拿消息
4.常见面试题
介绍下延迟队列
我们可以这样回答 延迟队列是一个特殊的队列消息发送后消费者并不会立刻拿到而是等待一定延迟时间后才发送给消费者进行消费 并且延迟队列的应用场景很多比如订单支付智能家电以及定时邮箱 但是延迟队列在RabbitMQ中并没有直接给我们提供我们可以通过TTL死信的方式或者使用延迟插件的方式来实现延迟功能 两者的区别
1.通过TTL死信 优点比较灵活不需要我们额外引入插件 缺点我们设置消息TTL的时候可能会出现顺序的问题而且我们需要多创建死信队列和死信交换机完成一些绑定增加了系统的复杂性
2.基于插件实现的延迟队列 优点通过插件能够简化延迟消息的实现并且避免了时序问题 缺点需要依赖插件不同版本RabbitMQ需要不同版本插件有运维工作