网站改版 降权,所有北京网站建设公司,做网站要以单位,南昌市住房和城乡建设网站文章目录 1. 场景模拟2. 消息发送3. 消息接收4. 测试5. 能者多劳6. 总结 当你在处理消息时#xff0c;可能会遇到这样的问题#xff1a;消息的生产速度远远大于消费速度#xff0c;导致消息堆积。这时候#xff0c;Work Queues#xff08;工作队列#xff09;模型就能派上… 文章目录 1. 场景模拟2. 消息发送3. 消息接收4. 测试5. 能者多劳6. 总结 当你在处理消息时可能会遇到这样的问题消息的生产速度远远大于消费速度导致消息堆积。这时候Work Queues工作队列模型就能派上用场。简单来说Work Queues 让多个消费者绑定到一个队列共同消费队列中的消息从而加快消息处理速度。 1. 场景模拟
我们来模拟一个这样的场景。首先在控制台创建一个名为 work.queue 的队列。
2. 消息发送
我们通过循环发送大量消息来模拟消息堆积的现象。在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}3. 消息接收
为了模拟多个消费者绑定同一个队列我们在 consumer 服务的 SpringRabbitListener 中添加两个新的方法
RabbitListener(queues work.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues work.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}注意到这两个消费者都设置了 Thread.sleep 来模拟任务耗时
消费者1Thread.sleep(20)相当于每秒处理50个消息。消费者2Thread.sleep(200)相当于每秒处理5个消息。
4. 测试
启动 ConsumerApplication 后执行 publisher 服务中编写的发送测试方法 testWorkQueue。结果如下
消费者1接收到消息【hello, message_0】21:06:00.869555300
消费者2........接收到消息【hello, message_1】21:06:00.884518
...
消费者1接收到消息【hello, message_48】21:06:01.920702500
消费者2........接收到消息【hello, message_49】21:06:05.723106700可以看到消费者1和消费者2各自消费了25条消息
消费者1快速完成了任务。消费者2则缓慢处理任务。
消息是平均分配给每个消费者的并没有考虑到各个消费者的处理能力导致一个消费者空闲另一个忙碌。这显然是低效的。
5. 能者多劳
在 spring 中可以通过简单配置解决这个问题。修改 consumer 服务的 application.yml 文件添加如下配置
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息再次测试结果如下
消费者1接收到消息【hello, message_0】21:12:51.659664200
消费者2........接收到消息【hello, message_1】21:12:51.680610
...
消费者2........接收到消息【hello, message_49】21:12:52.746299900这次消费者1处理了更多的消息消费者2则处理了较少的消息总耗时在1秒左右大大提升了效率。这充分利用了每一个消费者的处理能力有效避免了消息积压问题。
6. 总结
Work Queues 模型的使用要点
多个消费者绑定到一个队列同一条消息只会被一个消费者处理。通过设置 prefetch 来控制消费者预取的消息数量。
这样可以更高效地利用资源提高消息处理速度。