flash 网站 收费,张掖网站建设培训班,wordpress 多级菜单插件,视频直播app开发公司目录 减少发送mq的消息体内容
增加消费者数量
批量消费消息
临时队列转移
监控和预警机制
分阶段实施
最后还有一个方法就是开启队列的懒加载 这篇文章总结一下自己知道的解决消息积压得方法。 减少发送mq的消息体内容
像我们没有必要知道一个的中间状态#xff0c;只需…
目录 减少发送mq的消息体内容
增加消费者数量
批量消费消息
临时队列转移
监控和预警机制
分阶段实施
最后还有一个方法就是开启队列的懒加载 这篇文章总结一下自己知道的解决消息积压得方法。 减少发送mq的消息体内容
像我们没有必要知道一个的中间状态只需知道一个最终状态就可以了。 发送的消息体只用包含id和状态等关键信息不用发送一个完整的对象内容。 消费者收到消息之后通过id调用原服务再将完整的消息对象内容查询出来即可最后再进行消费处理。
增加消费者数量
采用动态增加消费者的数量
Configuration
public class RabbitMQConfig {Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发消费者数量factory.setConcurrentConsumers(5); // 初始消费者数量factory.setMaxConcurrentConsumers(20); // 最大消费者数量// 动态调整消费者数量factory.setConsumerTagStrategy(queue - consumer- UUID.randomUUID());return factory;}
}
Service
public class ConsumerManagerService {Autowiredprivate RabbitListenerEndpointRegistry registry;public void adjustConsumerCount(String queueName, int count) {MessageListenerContainer container registry.getListenerContainer(queueName);if (container instanceof SimpleMessageListenerContainer) {SimpleMessageListenerContainer simpleContainer (SimpleMessageListenerContainer) container;simpleContainer.setConcurrentConsumers(count);}}
}
批量消费消息
Service
public class BatchMessageConsumer {RabbitListener(queues myQueue, containerFactory batchFactory)public void processBatch(ListMessage messages, Channel channel) {try {// 批量处理消息ListMessageDTO dtos messages.stream().map(this::convertToDTO).collect(Collectors.toList());// 批量保存到数据库batchSaveToDatabase(dtos);// 获取最后一条消息的deliveryTaglong lastDeliveryTag messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();// 批量确认channel.basicAck(lastDeliveryTag, true);} catch (Exception e) {// 批量拒绝handleBatchError(messages, channel);}}
}// 配置批量消费
Configuration
public class BatchConsumerConfig {Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 启用批量消费factory.setBatchListener(true);// 批量大小factory.setBatchSize(100);// 批量超时时间factory.setReceiveTimeout(1000L);return factory;}
}
临时队列转移
Service
public class MessageTransferService {Autowiredprivate RabbitTemplate rabbitTemplate;public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {while (true) {// 从源队列批量获取消息ListMessage messages new ArrayList();for (int i 0; i batchSize; i) {Message message rabbitTemplate.receive(sourceQueue);if (message null) break;messages.add(message);}if (messages.isEmpty()) break;// 转移到临时队列messages.forEach(msg - rabbitTemplate.send(tempQueue, msg));}}
}// 临时队列的消费者
Component
public class TempQueueConsumer {RabbitListener(queues #{tempQueue.name})public void processMessage(Message message) {// 使用更高效的处理方式fastProcessMessage(message);}Beanpublic Queue tempQueue() {return new Queue(temp-queue- UUID.randomUUID(), false, false, true);}
}
监控和预警机制
Service
Slf4j
public class QueueMonitorService {Autowiredprivate RabbitTemplate rabbitTemplate;Scheduled(fixedRate 60000) // 每分钟执行一次public void monitorQueueSize() {String queueName myQueue;// 获取队列信息Properties properties rabbitTemplate.execute(channel - channel.queueDeclarePassive(queueName));// 获取消息数量int messageCount properties.getMessageCount();// 检查消息堆积if (messageCount threshold) {// 发送告警sendAlert(queueName, messageCount);// 动态调整消费者adjustConsumers(queueName, messageCount);}}private void adjustConsumers(String queueName, int messageCount) {// 根据消息数量动态调整消费者数量int newConsumerCount calculateConsumerCount(messageCount);consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);}
}
分阶段实施
Service
public class MessageHandlingStrategy {public void handleMessageBacklog() {// 1. 首先增加消费者数量adjustConsumerCount();// 2. 如果仍然堆积启用批量处理if (isStillBacklogged()) {enableBatchProcessing();}// 3. 如果问题持续使用临时队列if (isEmergency()) {transferToTemporaryQueue();}}
}
最后还有一个方法就是开启队列的懒加载