企业站官网,wordpress 织梦 淘客,做物流的可以在那些网站找客户端,哪个网站可以找做中厚板的公司高性能队列框架-Disruptor
首先介绍一下 Disruptor 框架#xff0c;Disruptor是一个通用解决方案#xff0c;用于解决并发编程中的难题#xff08;低延迟与高吞吐量#xff09;#xff0c;Disruptor 在高并发场景下性能表现很好#xff0c;如果有这方面需要#xff0c;…高性能队列框架-Disruptor
首先介绍一下 Disruptor 框架Disruptor是一个通用解决方案用于解决并发编程中的难题低延迟与高吞吐量Disruptor 在高并发场景下性能表现很好如果有这方面需要可以深入研究其源码
其本质还是一个队列环形与其他队列类似也是基于生产者消费者模式设计只不过这个队列很特别是一个环形队列。这个队列能够在无锁的条件下进行并行消费也可以根据消费者之间的依赖关系进行先后次序消费。
使用 Disruptor 框架的好处就是速度快
生产者向 RingBuffer 写入消费者从 RingBuffer 中消费基于 Disruptor 开发的系统每秒可以支持 600 万订单
下边介绍一下 Disruptor 框架中常见概念
RingBuffer
基于数组实现的一个环用于在不同线程间传递数据RingBuffer 有一个 Sequencer 序号器指向数组中下一个可用元素 Sequencer 序号器
该类是 Disruptor 核心有两个实现类
SingleProducerSequencer 单生产者MultiProducerSequencer 多生产者
WaitStrategy 等待策略
消费者等待生产者将数据放入 RingBuffer有不同的等待策略
BlockingWaitStrategy阻塞等待策略最低效的策略但其对 CPU 的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。SleepingWaitStrategy休眠等待策略性能表现跟 BlockingWaitStrategy 差不多对 CPU 的消耗也类似但其对生产者线程的影响最小适合用于异步日志类似的场景。YieldingWaitStrategy产生等待策略性能最好适合用于低延迟的系统在要求极高性能且事件处理线程数小于 CPU 逻辑核心数的场景中推荐使用。是无锁并行
Disruptor 的设计中是没有锁的在 Disruptor 中出现线程竞争的地方也就是 RingBuffer 中的下标 SequenceDisruptor 通过 CAS 操作来代替加锁从而提升性能CAS 的性能大约是加锁操作性能的 8 倍
伪共享问题
Disruptor 中还会出现伪共享问题
参考《高性能队列——Disruptor》——美团技术团队
缓存行
Cache 是由很多个 cache line 组成每个 cache line 通常是 64B并且可以有效地引用主内存中的一块地址。
Java 中 long 类型变量是 8B因此一个 cache line 可以存储 8 个 long 类型变量
CPU 每次从主存中拉取数据时会把相邻的数据也存入同一个 cache line那么在访问一个 long 数组时如果数组中的一个值被加入缓存中那么也会加载另外 7 个
伪共享问题
在 ArrayBlockingQueue 中有 3 个成员变量
takeIndex需要被取走元素下标putIndex可被插入元素下标count队列元素数量
这 3 个变量如果在同一个 cache line 中的话假如此时有两个线程对这 3 个变量进行操作线程 A 修改了 takeIndex 变量那么会导致线程 B 中这个变量所在的 cache line 失效需要从内存重新读取
这种无法充分利用 cache line 特性的线程成为 伪共享
解决方案就是增大数组元素之间的间隔使得不同线程存取的元素位于不同的 cache line 上通过空间换时间 在jdk1.8中有专门的注解 Contended 来避免伪共享更优雅地解决问题。 Disruptor 通过哪些设计来解决队列速度慢的问题了呢 环形数组 RingBuffer 采用环形数组空间重复利用避免垃圾回收并且数组对于缓存机制更加友好 元素位置定位 数组长度 2^n通过位运算加快定位速度 无锁设计 通过 CAS 代替锁来保证操作的线程安全 在美团内部很多高并发场景借鉴了Disruptor的设计减少竞争的强度。其设计思想可以扩展到分布式场景通过无锁设计来提升服务性能
Disruptor 多个生产者、多个消费者原理
在 Disruptor 中多个生产者生产数据时每个线程获取不同的一段数组空间再加上 CAS 操作可以避免多个线程重复写同一个元素
在读取时如何避免读取到未写的元素呢
Disruptor 中新创建了一个与 RingBuffer 大小相同的 available Buffer当某个位置写入成功就在 available Buffer 中标记为 true通过该标记来读取已经写好的元素
Disruptor 单生产者单消费者实战
首先引入依赖
dependencygroupIdcom.lmax/groupIdartifactIddisruptor/artifactIdversion3.3.4/version
/dependency定义订单
/*** 订单对象生产者要生产订单对象消费者消费订单对象*/
public class OrderEvent {// 订单的价格private long value;public long getValue() {return value;}public void setValue(long value) {this.value value;}
}定义工厂类用于创建订单对象
/*** 建立一个工厂类用于创建Event的实例OrderEvent)*/
public class OrderEventFactory implements EventFactoryOrderEvent {Overridepublic OrderEvent newInstance() {// 生产对象return new OrderEvent();}
}定义事件处理器用于监听消费订单
/*** 消费者*/
public class OrderEventHandler implements EventHandlerOrderEvent {Overridepublic void onEvent(OrderEvent orderEvent, long l, boolean b) {System.err.println(消费者: orderEvent.getValue());}
}定义生产者用于生产订单
public class OrderEventProducer {// ringBuffer 用于存储数据private RingBufferOrderEvent ringBuffer;public OrderEventProducer(RingBufferOrderEvent ringBuffer) {this.ringBuffer ringBuffer;}// 生产者向 ringBuffer 中生产消息public void sendData(ByteBuffer data) {// 1. 生产者先从 ringBuffer 拿到可用的序号long sequence ringBuffer.next();try {// 2.根据这个序号找到具体的 OrderEvent 元素, 此时获取到的 OrderEvent 对象是一个没有被赋值的空对象OrderEvent event ringBuffer.get(sequence);// 3. 设置订单价格event.setValue(data.getLong(0));} catch (Exception e) {e.printStackTrace();} finally {// 4. 提交发布操作ringBuffer.publish(sequence);}}
}测试类
public class Main {public static void main(String[] args) {// 初始化一些参数OrderEventFactory orderEventFactory new OrderEventFactory();int ringBufferSize 8;ExecutorService executor Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());/*** 参数说明* eventFactory:消息(event)工厂对象* ringBufferSize: 容器的长度* executor:线程池建议使用自定义的线程池线程上限。* ProducerType:单生产者或多生产者* waitStrategy:等待策略*/// 1. 实例化disruptor对象DisruptorOrderEvent disruptor new DisruptorOrderEvent(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());// 2. 向 Disruptor 中添加消费者消费者监听到 Disruptor 的 RingBuffer 中有数据了就会进行消费disruptor.handleEventsWith(new OrderEventHandler());// 3. 启动disruptordisruptor.start();// 4. 拿到存放数据的容器RingBufferRingBufferOrderEvent ringBuffer disruptor.getRingBuffer();// 5. 创建生产者OrderEventProducer producer new OrderEventProducer(ringBuffer);// 6. 通过生产者向容器 RingBuffer 中存放数据ByteBuffer bb ByteBuffer.allocate(8);for (long i 0; i 100; i) {bb.putLong(0, i);producer.sendData(bb);}// 7.关闭disruptor.shutdown();executor.shutdown();}
}
Disruptor 多生产者和多消费者实战
定义消费者用于从 ringBuffer 中消费订单
public class ConsumerHandler implements WorkHandlerOrder {// 每个消费者有自己的idprivate String comsumerId;// 计数统计多个消费者所有的消费者总共消费了多个消息。private static AtomicInteger count new AtomicInteger(0);private Random random new Random();public ConsumerHandler(String comsumerId) {this.comsumerId comsumerId;}// 当生产者发布一个 sequenceringbuffer 中一个序号里面生产者生产出来的消息生产者最后publish发布序号// 消费者会监听如果监听到就会ringbuffer去取出这个序号取到里面消息Overridepublic void onEvent(Order event) throws Exception {// 模拟消费者处理消息的耗时设定1-4毫秒之间TimeUnit.MILLISECONDS.sleep(1 * random.nextInt(5));System.out.println(当前消费者: this.comsumerId , 消费信息 ID: event.getId());// count 计数器增加 1表示消费了一个消息count.incrementAndGet();}// 返回所有消费者总共消费的消息的个数。public int getCount() {return count.get();}
}定义订单
Data
public class Order {private String id;private String name;private double price;public Order() {}
}定义生产者用于向 ringBuffer 中生产订单
public class Producer {private RingBufferOrder ringBuffer;// 为生产者绑定 ringBufferpublic Producer(RingBufferOrder ringBuffer) {this.ringBuffer ringBuffer;}// 发送数据public void sendData(String uuid) {// 1. 获取到可用sequencelong sequence ringBuffer.next();try {Order order ringBuffer.get(sequence);order.setId(uuid);} finally {// 2. 发布序号ringBuffer.publish(sequence);}}
}测试类
public class TestMultiDisruptor {public static void main(String[] args) throws InterruptedException {// 1. 创建 RingBufferDisruptor 包含 RingBufferRingBufferOrder ringBuffer RingBuffer.create(ProducerType.MULTI, // 多生产者new EventFactoryOrder() {Overridepublic Order newInstance() {return new Order();}}, 1024 * 1024, new YieldingWaitStrategy());// 2. 创建 ringBuffer 屏障SequenceBarrier sequenceBarrier ringBuffer.newBarrier();// 3. 创建多个消费者数组ConsumerHandler[] consumers new ConsumerHandler[10];for (int i 0; i consumers.length; i) {consumers[i] new ConsumerHandler(C i);}// 4. 构建多消费者工作池WorkerPoolOrder workerPool new WorkerPoolOrder(ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumers);// 5. 设置多个消费者的 sequence 序号用于单独统计消费者的消费进度。消费进度让RingBuffer知道ringBuffer.addGatingSequences(workerPool.getWorkerSequences());// 6. 启动 workPoolworkerPool.start(Executors.newFixedThreadPool(5)); // 在实际开发自定义线程池。final CountDownLatch latch new CountDownLatch(1);// 100 个生产者向 ringBuffer 生产数据每个生产者发送 100 个数据共 10000 个数据for (int i 0; i 100; i ) {final Producer producer new Producer(ringBuffer);new Thread(new Runnable() {Overridepublic void run() {try {// 先等待创建完 100 个生产者之后再发送数据latch.await();} catch (Exception e) {e.printStackTrace();}// 每个生产者发送 100 个数据for (int j 0; j 100; j ) {producer.sendData(UUID.randomUUID().toString());}}}).start();}// 把所有线程都创建完TimeUnit.SECONDS.sleep(2);// 唤醒线程让生产者开始发送数据开始运行100个线程latch.countDown();// 等待数据发送完毕TimeUnit.SECONDS.sleep(10);System.out.println(任务总数: consumers[0].getCount());}static class EventExceptionHandler implements ExceptionHandlerOrder {//消费时出现异常Overridepublic void handleEventException(Throwable throwable, long l, Order order) {}//启动时出现异常Overridepublic void handleOnStartException(Throwable throwable) {}//停止时出现异常Overridepublic void handleOnShutdownException(Throwable throwable) {}}
}Disruptor 与 Netty 结合大幅提高数据处理性能
使用 Netty 接收处理数据时不要在工作线程上进行处理降低 Netty 性能可以使用异步机制通过线程池来处理异步处理的话就是用 Disruptor 来作为任务队列即可
即在 Netty 收到处理数据请求时封装成一个事件向 Disruptor 中推送再通过多消费者来进行处理可以提升 Netty 处理数据时的性能流程图如下绿色部分为通过 Disruptor 优化部分