手机免费永久建立网站,高端网站开发公开课,网站建设佰金手指科杰二九,想做企业网站Kafka中存在大量的延迟操作#xff0c;比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器#xff0c;来完成这些延迟操作。
1 时间轮
Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能#xff0c;因为它们的插入和…Kafka中存在大量的延迟操作比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器来完成这些延迟操作。
1 时间轮
Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。
1.1 Timer 和 DelayQueue
它们都使用了一个优先级队列通常基于堆实现来管理任务。
1.1.1 Timer
用于计划在特定时间后执行的任务这些任务可以只执行一次或定期重复执行。其有以下特点
运行在单线程中无法满足多个任务同时执行的需求。如果前置任务耗时长可能会阻塞后置任务。如果任务执行过程中抛出异常Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer new Timer();Date startTime new Date();TimerTask task1 new TimerTask() {Overridepublic void run() {long dis (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println(task1执行,距离开始时间: dis s);}};TimerTask task2 new TimerTask() {Overridepublic void run() {long dis (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println(task2执行,距离开始时间: dis s休眠5s。);try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(task2休眠结束);}};TimerTask task3 new TimerTask() {Overridepublic void run() {long dis (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println(task3执行,距离开始时间: dis s);}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
// 执行结果:
// task1执行,距离开始时间:1s
// task2执行,距离开始时间:2s休眠5s。
// task2休眠结束
// task3执行,距离开始时间:7s
}
1.1.2 DelayQueue
是一个无界阻塞队列用于存储实现了Delayed接口的元素这些元素只有在它们的延迟期满时才会被取出。其有以下特点
线程安全可以在多线程环境中使用。无界队列可以存储任意数量的元素直到系统内存耗尽。延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName taskName;this.delayTime delayTime System.currentTimeMillis();}Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}Overridepublic int compareTo(Delayed o) {if (this.delayTime ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime ((DelayQueueTask) o).delayTime) {return 1;}return 0;}Overridepublic String toString() {return DelayQueueTask{ taskName taskName \ , delayTime delayTime };}}public static void main(String[] args) throws InterruptedException {DelayQueueDelayQueueTask delayQueue new DelayQueue();delayQueue.offer(new DelayQueueTask(task1,5000));delayQueue.offer(new DelayQueueTask(task2,2000));delayQueue.offer(new DelayQueueTask(task3,4000));System.out.println(开始执行delayQueue任务);while (!delayQueue.isEmpty()) {DelayQueueTask task delayQueue.take();System.out.println(任务: task);}System.out.println(delayQueue任务 任务执行完毕);}
}
1.2 时间轮结构
Kafka 在任务的插入与删除采用了时间轮结构其时间复杂度为O(1)而在时间推进上还是依赖JDK提供的DelayQueue。 图 时间轮TimingWheel结构
Kafka的时间轮是一个存储定时任务的环形队列每个元素时间格相当于一个桶(bucket)来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表链表中的每一项都是定时任务项TimerTaskEntry其中封装了真正的定时任务TimerTask。
Kafka将TimerTaskList插入到DelayQueue队列中使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。
1.2.1 时间格与时间跨度
时间轮由多个时间格组成上面示意图中每一层有10个时间格每个时间格代表时间轮的基本时间跨度tick,时间格数wheelSize是固定的,那么时间轮的总体跨度interval tick * wheelSize。
时间轮还有一个表盘指针currentTime用来表示时间轮当前所处的时间。currentTime是tick的整数倍currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分此时需要处理此时间格所对应的TimerTaskList中的所有任务。
上面示意图中tick 1s此时currentTime指向第2个时间格需要处理这个时间格存储的所有任务。假设此时插入了一个3s后的任务则把该任务插入第5个时间格中的bucket。
1.2.2 时间轮层级
当currentTime 指向第2个时间格时需要插入一个33s后的任务此时时间超过了第一层的跨度1s * 10 10s。Kafka引入层级时间轮的概念当到期时间超过了当前时间轮所表示的时间范围时就会尝试添加到上层的时间轮中。
33s 的任务会被插入到第二层的第(33 / 10 3) 3 个时间格中。
第一层的开始时间第0格startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。
每一层时间轮都会有指向更高一层的引用。
1.2.3 任务处理及时间轮降级 图 时钟
时间轮类似于时钟当每一层走完一圈时上一层就会走一格。例如当第1层的currentTime 指向第2格时此时需要插入两个任务分别是33s及39s后。它们都会被插入到第2层的第3格中的bucketTimerTaskList。
假设经过33s第1层指向第5格第2层指向第3格后此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出将33s的任务执行并从TimerTaskList中删除。此时39s的任务还剩6sKafka会把这个任务“降级”插入到第1层第156% 10格中。
1.2.4 时间推进与DelayQueue
如果按照时间格一格格推进时间这样消耗会比较大而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。
将时间格bucket的TimerTaskList封装成Delayed其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序最快到达的排在队列头部。当到达时刻时将表头的TimerTaskEntry取出对它的TaskEntry执行任务执行或降级等操作。