怎样建自己的网站免费的,免费的网站制作平台,加强网站内容建设,机械加工网络接单1. 适用场景
日常开发中#xff0c;我们经常遇到这样的需求#xff0c;在某个事件发生后#xff0c;过一段时间做一个额外的动作#xff0c;比如
拼单#xff0c;如果2小时未能成单#xff0c;取消拼单下单#xff0c;30分钟内未支付#xff0c;取消订单 之前的我们的…1. 适用场景
日常开发中我们经常遇到这样的需求在某个事件发生后过一段时间做一个额外的动作比如
拼单如果2小时未能成单取消拼单下单30分钟内未支付取消订单 之前的我们的做法通常是通过定时任务轮询比如扫描创建时间是2小时之前状态是未成功的拼单然后做取消操作。这种方案存在的问题是:扫描对数据库造成一定的压力轮询的时间间隔会导致操作有一定的延迟 延迟消息正是用来解决这类问题的银弹。
2. JDK实现
2.1 使用方式
JDK内部提供了DelayQueue队列和Delayed接口来实现延迟消息我们先来看一个简单的Demo我们会创建一个DelayMessage用来代表延迟消息延迟消息需要实现Delayed接口
getDealy返回消息的延迟时间compareTo为了让多个延迟消息排序将时间最早的消息排到最前面
public class DelayMessage implements Delayed {private long expiredAtMs;private long delayMs;private String message;public DelayMessage(long delaySeconds, String message) {this.delayMs delaySeconds * 1000;this.expiredAtMs System.currentTimeMillis() delayMs;this.message message;}Overridepublic long getDelay(TimeUnit unit) {long diff expiredAtMs - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}Overridepublic int compareTo(Delayed o) {long sTtl getDelay(TimeUnit.MILLISECONDS);long oTtl o.getDelay(TimeUnit.MILLISECONDS);return sTtl oTtl ? -1 : (sTtl oTtl ? 1 : 0);}public String getMessage() {return this.message;}
}接着只需要创建消息队列将延迟消息放入到队列中即可然后创建一个线程来消费延迟队列即可
DelayQueueDelayMessage queue new DelayQueue();
queue.put(new DelayMessage(1, 1s later));
queue.put(new DelayMessage(60, 60s later));
queue.put(new DelayMessage(120, 120s later));ExecutorService es Executors.newSingleThreadExecutor();
es.submit(() - {try {while (true) {DelayMessage dm queue.take();System.out.println(currentTimeInText() _ dm.getMessage());}} catch (InterruptedException e) {throw new RuntimeException(e);}
});2.2 实现原理
从DelayQueue的源码我们可以看到整个DelayQueue的核心就在于3个点:
数据存储基于PriorityQueue通过Delayed的compareTo方法排序即基于时间顺序数据写入offer/put方法数据消费take/poll方法
1. 数据写入
public boolean offer(E e) {final ReentrantLock lock this.lock;lock.lock();try {q.offer(e); // PriorityQueue写入if (q.peek() e) { // 如果刚刚写入的消息是最高优先级的(最早被消费的)唤醒在take()方法阻塞的线程leader null; // Leader-Follow Parttern减少RaceCondition http://www.cs.wustl.edu/~schmidt/POSA/POSA2/available.signal(); // 唤醒在take()阻塞的线程}return true;} finally {lock.unlock();}
}2. 数据消费
public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try {for (;;) {E first q.peek();if (first null)available.await(); // 队列为空阻塞直到offer(e)被调用else {long delay first.getDelay(NANOSECONDS);if (delay 0) // 延迟时间到了取出item供使用return q.poll();first null; // dont retain ref while waitingif (leader ! null)available.await(); // await释放锁其他线程执行take()如果leader ! null有负责处理头部item的线程else {Thread thisThread Thread.currentThread(); // 走到这说明头部元素暂无处理线程将当前线程设定为处理线程leader thisThread;try {available.awaitNanos(delay); // 等待延迟时间后自动唤醒重新进入循环处理queue头部item} finally {if (leader thisThread)leader null;}}}}} finally {if (leader null q.peek() ! null)available.signal();lock.unlock();}
}代码很短设计还是巧妙的尤其是Leader-Follower模式的使用在我们实现自己的组件时可以借鉴。
3. Redis实现
JDK实现的延迟队列已经能解决部分场景了不过也存在两个明显的问题
队列数据没持久化重启或进程崩溃都会导致数据丢失不支持分布式不能跨进程共享
3.1 消息队列
通过上面的JDK实现我们已经能把Redis实现的延迟消息的逻辑猜的八九不离十了假设我们用LIST存储先通过LPUSH写入队列消息(message1、message2)
127.0.0.1:6379 LPUSH my_delay_queue message1
(integer) 1
127.0.0.1:6379 LPUSH my_delay_queue message2
(integer) 2
127.0.0.1:6379 LRANGE my_delay_queue 0 -1
1) message2
2) message1通过RPOPLPUSH从队列取出待消费的消息并暂存到临时队列(my_delay_queue)中
127.0.0.1:6379 RPOPLPUSH my_delay_queue my_delay_queue_temp
message1
127.0.0.1:6379 LRANGE my_delay_queue_temp 0 -1
1) message1
127.0.0.1:6379 LRANGE my_delay_queue 0 -1
1) message2这是在程序代码中消费message1如果消费成功从临时队列中删除消息
127.0.0.1:6379 LREM my_delay_queue_temp 1 message1
(integer) 1最终队列的状态是delayQueue中只剩message2临时队列中为空
127.0.0.1:6379 LRANGE my_delay_queue_temp 0 -1
(empty array)
127.0.0.1:6379 LRANGE my_delay_queue 0 -1
1) message23.2 延迟队列
用LIST只能实现FIFO要想实现基于时间的优先级需要改用ZSET来存储数据用时间做时间戳
127.0.0.1:6379 ZADD s_delay_queue 1728625236 message0
127.0.0.1:6379 ZADD s_delay_queue 1728625256 message0
127.0.0.1:6379 ZADD s_delay_queue 1728625256 message2
127.0.0.1:6379 ZADD s_delay_queue 1728625266 message3127.0.0.1:6379 ZRANGE s_delay_queue 0 -1 WITHSCORES
1) message0
2) 1728625236
3) message1
4) 1728625256
5) message2
6) 1728625256
7) message3
8) 1728625266通过使用ZRANGEBYSCORE获取延迟时间已经到的item
127.0.0.1:6379 ZRANGEBYSCORE s_delay_queue 0 1728625256
1) message0
2) message1
3) message2ZSET并没有提供RPOPLPUSH的命令我们使用Lua脚本来模拟这个操作这段lua接受两个KEY一个ARGV
KEYS[1]表示ZSET的名字KEYS[2]表示LIST的名字ARGV[1]表示SCORE的范围截至时间
local elements redis.call(ZRANGEBYSCORE, KEYS[1], 0, ARGV[1])
if #elements 0 thenfor i, element in ipairs(elements) doredis.call(LPUSH, KEYS[2], element)redis.call(ZREM, KEYS[1], element)end
end
return elements然后是通过EVAL执行这段Lua这里我们从ZSET(s_delay_queue)读取score 1728625237的item返回并暂存到LIST(s_delay_queue_temp)中模拟了RPOPLPUSH的操作
127.0.0.1:6379 EVAL local elements redis.call(ZRANGEBYSCORE, KEYS[1], 0, ARGV[1]) if #elements 0 then for i, element in ipairs(elements) do redis.call(LPUSH, KEYS[2], element) redis.call(ZREM, KEYS[1], element) end end return elements 2 s_delay_queue s_delay_queue_temp 1728625237
1) message0剩下的逻辑基本上和[[基于Redis的延迟队列#3.1 消息队列]]一样在程序中消费message成功之后删除s_delay_queue_temp中的数据。我们需要做的是在程序中定时的执行这段Lua脚本并且实现类似DelayQueue的逻辑支持阻塞的take()操作以及消费失败时的错误处理显然要处理的错误细节并不少。
3.3 Redisson实现
1. 数据结构
Redisson封装了基于Redis的延迟消息实现我们来看一个使用的Redisson延迟队列的demo
Config config new Config();
config.useSingleServer().setAddress(redis://127.0.0.1:6379);
RedissonClient redisson Redisson.create(config);RBlockingQueueString blockingQueue redisson.getBlockingQueue(delayBlockingQueue);
RDelayedQueueString delayedQueue redisson.getDelayedQueue(blockingQueue);delayedQueue.offer(message1, 1, TimeUnit.MINUTES);
delayedQueue.offer(message2, 5, TimeUnit.MINUTES);
delayedQueue.offer(message3, 10, TimeUnit.MINUTES);
delayedQueue.offer(message4, 15, TimeUnit.MINUTES);ExecutorService es Executors.newSingleThreadExecutor();
es.submit(() - {while (true) {String data blockingQueue.poll(60, TimeUnit.SECONDS);if (data ! null) {System.out.println(new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date()) : data);}}
});Redisson的实现比[[#3.2 延迟队列]]要负责一点它内部构建了4个数据结构。通过Redis的命令查看我们能看到3个KEY
127.0.0.1:6379 KEYS *
2) delayBlockingQueue
4) redisson_delay_queue:{delayBlockingQueue}
6) redisson_delay_queue_timeout:{delayBlockingQueue}delayBlockingQueue是我们创建RBlockingQueue时指定的名称用来存储延迟时间到期但尚未被处理的任务redisson_delay_queue_timeout:{delayBlockingQueue}类型是zset记录延迟任务和时间redisson_delay_queue:{delayBlockingQueue}类型是list记录任务列表保持任务的顺序 通过TYPE命令我们能查看他们的数据类型
127.0.0.1:6379 TYPE redisson_delay_queue:{delayBlockingQueue}
list
127.0.0.1:6379 TYPE redisson_delay_queue_timeout:{delayBlockingQueue}
zset此外Redission还创建了一个Channel用来在delayQueue写入数据的时候做通知
127.0.0.1:6379 PUBSUB channels
1) redisson_delay_queue_channel:{delayBlockingQueue}2. 数据写入
通过RDelayedQueue写入数据的时候最终会调用offerAsync方法
public RFutureVoid offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay 0) {throw new IllegalArgumentException(Delay cant be negative);}long delayInMs timeUnit.toMillis(delay);long timeout System.currentTimeMillis() delayInMs;long randomId ThreadLocalRandom.current().nextLong();return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,local value struct.pack(dLc0, tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]); redis.call(zadd, KEYS[2], ARGV[1], value); // 写入 redisson_delay_queue_timeout:{delayBlockingQueue} redis.call(rpush, KEYS[3], value); // 写入 redisson_delay_queue:{delayBlockingQueue}// if new object added to queue head when publish its startTime // to all scheduler workers local v redis.call(zrange, KEYS[2], 0, 0); // 取时间戳最小的元素 if v[1] value then redis.call(publish, KEYS[4], ARGV[1]); // 如果新插入的元素是zset的第一个元素做channel通知 end;,Arrays.ObjectasList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));
}3. 数据消费
创建RDelayedQueue时redisson创建了一个QueueTransferTask任务负责从redisson_delay_queue_timeout:{delayBlockingQueue}将到期的数据迁移到delayBlockingQueue中
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);channelName prefixName(redisson_delay_queue_channel, getName());queueName prefixName(redisson_delay_queue, getName());timeoutSetName prefixName(redisson_delay_queue_timeout, getName());QueueTransferTask task new QueueTransferTask(commandExecutor.getConnectionManager()) {Overrideprotected RFutureLong pushTaskAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,local expiredValues redis.call(zrangebyscore, KEYS[2], 0, ARGV[1], limit, 0, ARGV[2]); // 从redisson_delay_queue_timeout拿到期的任务 if #expiredValues 0 then for i, v in ipairs(expiredValues) do local randomId, value struct.unpack(dLc0, v); redis.call(rpush, KEYS[1], value); // 写入到 delayBlockingQueue redis.call(lrem, KEYS[3], 1, v); // 从 redisson_delay_queue 删除 end; redis.call(zrem, KEYS[2], unpack(expiredValues)); // 从 redisson_delay_queue_timeout 删除 end; // get startTime from scheduler queue head task local v redis.call(zrange, KEYS[2], 0, 0, WITHSCORES); if v[1] ~ nil then // 如果最小时间戳的任务存在返回它的时间戳 return v[2]; end return nil;,Arrays.ObjectasList(getName(), timeoutSetName, queueName), // KEYS: delayBlockingQueue , redisson_delay_queue_timeout*、redisson_delay_queue*System.currentTimeMillis(), 100);}Overrideprotected RTopic getTopic() {return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);}};queueTransferService.schedule(queueName, task);this.queueTransferService queueTransferService;
}4. RBlockingQueue
通过[[#3. 数据消费]]的操作redisson已经将到期的延迟任务写入到delayBlockingQueue了剩下要做的就是用delayBlockingQueue实现阻塞队列了核心代码在 RedissonBlockingQueue其实实现很简单我们来看下代码take()方法实际只是执行了一个redis命令BLPOP
Override
public V take() throws InterruptedException {return commandExecutor.getInterrupted(takeAsync());
}
Override
public RFutureV takeAsync() {return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}