打折网站运营思路,wordpress 付费下载,手机单页网站通用模板,昆山花桥做网站基于Rocket MQ扩展的无限延迟消息队列
背景:
Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的…基于Rocket MQ扩展的无限延迟消息队列
背景:
Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.
实现原理: 简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用. 源码如下: /** Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.*/
package com.example.xxx.utils;import com.vevor.bmp.crm.common.constants.MQConstants;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** version 1.8.0* description 基于Rocket MQ的任意延迟时长工具* program user-growth* date Created in 2023/5/22 3:35 下午* since 1.8.0*/
Slf4j
Component
RocketMQMessageListener(consumerGroup MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,topic MQConstants.CRM_DELAY_QUEUE_TOPIC,// 消息消费顺序consumeMode ConsumeMode.CONCURRENTLY,// 最大消息重复消费次数maxReconsumeTimes 3)
public class RocketMQDelayQueueUtils implements RocketMQListenerRocketMQDelayQueueUtils.DelayTableObject {/*** Rocket MQ客户端*/Resourceprivate RocketMQTemplate rocketMQTemplate;/*** MQ默认延迟等级*/private static final long[] TIME_DELAY_LEVEL new long[]{0L, 1000L, 5000L, 10000L,30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};SneakyThrowsOverridepublic void onMessage(DelayTableObject message) {Date endTime message.getEndTime();int delayLevel getDelayLevel(endTime);// 继续延迟if (delayLevel ! 0) {int currentDelayCount message.getCurrentDelayCount();currentDelayCount;message.setCurrentDelayCount(currentDelayCount);message.setCurrentDelayLevel(delayLevel);message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);this.sendDelayMessage(message);return;}// 执行业务log.info(delay message end! start to process business...);Class? extends DelayMessageHandler messageHandler message.getMessageHandler();if (messageHandler ! null) {DelayMessageHandler delayMessageHandler messageHandler.newInstance();delayMessageHandler.handle();}}/*** 延迟消息体** param E 消息类型*/Datapublic static class DelayTableE implements Serializable {private static final long serialVersionUID 2405172041950251807L;/*** 延迟消息体*/private E content;/*** 消息延迟结束时间*/private Date endTime;/*** 总延迟毫秒数*/private long totalDelayTime;/*** 总延迟时间单位*/private TimeUnit totalDelayTimeUnit;/*** 当前延迟次数*/private int currentDelayCount;/*** 当前延迟等级*/private int currentDelayLevel;/*** 当前延迟毫秒数*/private long currentDelayMillis;/*** 延迟处理逻辑*/private Class? extends DelayMessageHandler messageHandler;}/*** 发送延迟消息** param message 消息体* param delay 延迟时长* param timeUnit 延迟时间单位* param handler 延迟时间到了之后需要处理的逻辑* param E 延迟消息类型*/public E void delay(E message, int delay, TimeUnit timeUnit, Class? extends DelayMessageHandler handler) {// 把延迟时间转换成时间戳(毫秒)long totalDelayMills timeUnit.toMillis(delay);// 根据延迟时间计算结束时间Calendar instance Calendar.getInstance();instance.add(Calendar.MILLISECOND, (int)totalDelayMills);Date endTime instance.getTime();// 根据延迟时间匹配延迟等级(delay level)int delayLevel getDelayLevel(endTime);long delayMillis TIME_DELAY_LEVEL[delayLevel];// 发送消息DelayTableE delayTable new DelayTable();// 全局数据delayTable.setContent(message);delayTable.setMessageHandler(handler);delayTable.setEndTime(endTime);delayTable.setTotalDelayTime(delay);delayTable.setTotalDelayTimeUnit(timeUnit);// 当前延迟等级数据delayTable.setCurrentDelayCount(1);delayTable.setCurrentDelayLevel(delayLevel);delayTable.setCurrentDelayMillis(delayMillis);this.sendDelayMessage(delayTable);}/*** 计算延迟等级** param targetTime 延迟截止时间* return Rocket MQ延迟消息等级*/private static int getDelayLevel(Date targetTime) {long currentTime System.currentTimeMillis();long delayMillis targetTime.getTime() - currentTime;if (delayMillis 0) {// 不延迟即延迟等级为 0return 0;}// 判断处于哪个延迟等级// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1hfor (int i 1; i 18; i) {long delayLevelTime TIME_DELAY_LEVEL[i];if (delayMillis delayLevelTime) {return i - 1;} else if (delayMillis delayLevelTime) {return i;}}// 最大延迟等级为 18return 18;}/*** 发送延迟消息** param delayTable 延迟对象可以循环使用*/SneakyThrowsprivate E void sendDelayMessage(DelayTableE delayTable) {// 消息序列化MessageDelayTableE message MessageBuilder.withPayload(delayTable).build();// 设置\发送延迟消息int delayLevel delayTable.getCurrentDelayLevel();rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message, 3000, delayLevel);log.debug(delay count: {}, delay level: {}, time: {} milliseconds,delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);}/*** 延迟回调接口** 回调逻辑必须实现该接口#hander()方法在延迟结束后会通过反射的方式调用该方法*/public interface DelayMessageHandler extends Serializable {long serialVersionUID 2405172041950251807L;/*** 回调函数*/void handle();}}测试代码: /** Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.*/
package com.vevor.bmp.crm.io.controller;import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
import com.vevor.common.pojo.vo.ResponseResult;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** version 1.8.0* description 延迟队列测试* program user-growth* date Created in 2023/5/22 4:54 下午* since 1.8.0*/
Slf4j
RestController
public class DelayQueueController {Resourceprivate RocketMQDelayQueueUtils rocketMQDelayQueueUtils;GetMapping(/mq/delay)SneakyThrowspublic ResponseResultString mqDelay(RequestParam Integer delay, RequestParam String task) {// 获取延时队列rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);return ResponseResult.success();}/*** version * description * program user-growth* date Created in 2023/5/23 2:11 下午* since */Datapublic static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {/*** 回调函数*/Overridepublic void handle() {log.info(i am business logical! {}, System.currentTimeMillis());}}
}优缺点:
优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.