asp.net 网站启动慢,南海网站建设价格,建筑设计建模软件,手机网页及网站设计目录 一、序言二、延迟队列实现1、Redisson延时消息监听注解和消息体2、Redisson延时消息发布器3、Redisson延时消息监听处理器 三、测试用例四、结语 一、序言
两个月前接了一个4万的私活#xff0c;做一个线上商城小程序#xff0c;在交易过程中不可避免的一个问题就是用户… 目录 一、序言二、延迟队列实现1、Redisson延时消息监听注解和消息体2、Redisson延时消息发布器3、Redisson延时消息监听处理器 三、测试用例四、结语 一、序言
两个月前接了一个4万的私活做一个线上商城小程序在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。
目前成熟的方案有通过RabbitMQ死信队列、RabbitMQ延迟消息插件、RocketMQ定时消息推送、Redisson延时队列来实现。
考虑到商城的定位和用户体量以及系统维护成本其实完全没有必要引入消息中间件借助Redis其实就可以轻松实现这个需求。
加上Redisson客户端本身就已经实现了很多分布式集合工具类借助阻塞队列和延时队列就可轻松搞定。
当然为了使用方便以及团队协作顺便模仿RabbitListener封装了一套基于注解的消息消费废话不多说直接上代码。 二、延迟队列实现
1、Redisson延时消息监听注解和消息体
延迟消息监听器定义
/*** Redisson延时队列监听器** author Nick Liu* date 2024/11/13*/
Documented
Retention(RetentionPolicy.RUNTIME)
Target(ElementType.METHOD)
public interface RedissonDelayedQueueListener {/*** 队列名称* return*/String queueName();
}消息体定义
Data
Builder
AllArgsConstructor
NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}
2、Redisson延时消息发布器
Slf4j
Component
RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info(开始发布延迟消息: {}, FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueueString blockingQueue redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueueString delayedQueue redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}这里我们借助RBlockingQueue和RDelayedQueue来实现只有当延迟消息快到期时消费者才能从阻塞队列拉取到消息否则消费者将一直阻塞。
3、Redisson延时消息监听处理器
这里我们定义了一个BeanPostProcessor 的实现目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。
/*** Redisson延迟队列Bean后处理器* author Nick Liu* date 2025/1/3*/
Slf4j
Component
RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class? clazz AopProxyUtils.ultimateTargetClass(bean);Method[] methods clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解则启动一个线程监听队列RedissonDelayedQueueListener annotation m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() - {log.info(开始监听Redisson延时队列[{}]消息, annotation.queueName());while (true) {RBlockingQueueString blockingQueue redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info(监听到队列[{}]延时消息: {}, annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}这里我们扫描到指定Bean的方法后会开启一个异步线程并轮询拉取延时消息如果消息没过期异步线程将会一直阻塞等待。 三、测试用例
/*** author Nick Liu* date 2025/2/2*/
Slf4j
RestController
RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE redisson:delayed:queue;private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;GetMapping(/delayed/msg)public ResponseEntityRedisDelayedMsgDTO publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg(This is a delayed msg);redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}RedissonDelayedQueueListener(queueName DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info(Received delayed msg: {}, msg);}
}
启动服务后Bean后处理器会启动异步线程监听延时消息如下
2025-02-02 16:46:04.271 INFO [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息浏览器直接输入http://localhost:8000/delayed/msg发布延时消息10s后消费者进行处理如下
2025-02-02 16:43:11.107 INFO [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg四、结语
虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消但是可用性相比专业的消息中间件还是尚有不足的。
比如消息生产者发送消息没有确认机制消息消费也没有确认机制这两个环节都有可能导致消息丢失。
当然我们可以通过其它保障机制去补偿比如再加上定时任务扫表把扫描时间可以设置长一点保证最终的一致性。
在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。