巴中商城网站建设,庆阳平面设计招聘网,做软件外包的网站,附近广告设计与制作Disruptor框架简介
Disruptor框架内部核心的数据结构是Ring Buffer#xff0c;Ring Buffer是一个环形的数组#xff0c;Disruptor框架以Ring Buffer为核心实现了异步事件处理的高性能架构#xff1b;JDK的BlockingQueue相信大家都用过#xff0c;其是一个阻塞队列#xf…Disruptor框架简介
Disruptor框架内部核心的数据结构是Ring BufferRing Buffer是一个环形的数组Disruptor框架以Ring Buffer为核心实现了异步事件处理的高性能架构JDK的BlockingQueue相信大家都用过其是一个阻塞队列内部通过锁机制实现生产者和消费者之间线程的同步。跟BlockingQueue一样Disruptor框架也是围绕Ring Buffer实现生产者和消费者之间数据的交换只不过Disruptor框架性能更高笔者曾经在同样的环境下拿Disruptor框架跟ArrayBlockingQueue做过性能测试Disruptor框架处理数据的性能比ArrayBlockingQueue的快几倍。
Disruptor框架性能为什么会更好呢其有以下特点
预加载内存可以理解为使用了内存池无锁化单线程写消除伪共享使用内存屏障序号栅栏机制
相关概念 Disruptor是使用Disruptor框架的核心类持有RingBuffer、消费者线程池、消费者集合ConsumerRepository和消费者异常处理器ExceptionHandler等引用 Ring Buffer: RingBuffer处于Disruptor框架的中心位置其是一个环形数组环形数组的对象采用预加载机制创建且能重用是生产者和消费者之间交换数据的桥梁其持有Sequencer的引用 Sequencer: Sequencer是Disruptor框架的核心实现了所有并发算法用于生产者和消费者之间快速、正确地传递数据其有两个实现类SingleProducerSequencer和MultiProducerSequencer。 Sequence:Sequence被用来标识Ring Buffer和消费者Event Processor的处理进度每个消费者Event Processor和Ring Buffer本身都分别维护了一个Sequence支持并发操作和顺序写其也通过填充缓存行的方式来消除伪共享从而提高性能。 Sequence Barrier:Sequence Barrier即为序号屏障通过追踪生产者的cursorSequence和每个消费者 EventProcessor的sequence的方式来协调生产者和消费者之间的数据交换进度其实现类ProcessingSequenceBarrier持有的WaitStrategy等待策略类是实现序号屏障的核心。 Wait Strategy:Wait Strategy是决定消费者如何等待生产者的策略方式当消费者消费速度过快时此时是不是要让消费者等待下此时消费者等待是通过锁的方式实现还是无锁的方式实现呢 Event Processor:Event Processor可以理解为消费者线程该线程会一直从Ring Buffer获取数据来消费数据其有两个核心实现类BatchEventProcessor和WorkProcessor。 Event Handler:Event Handler可以理解为消费者实现业务逻辑的Handler被BatchEventProcessor类引用在BatchEventProcessor线程的死循环中不断从Ring Buffer获取数据供Event Handler消费。 Producer:生产者一般用RingBuffer.publishEvent来生产数据。
快速入门
MQManager启用Disruptor返回RingBuffer实例。
Configuration
public class MQManager {Bean(messageModel)public RingBufferMessageModel messageModelRingBuffer() {//定义用于事件处理的线程池 Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理 ExecutorService executor Executors.newFixedThreadPool(2);//指定事件工厂 HelloEventFactory factory new HelloEventFactory();//指定ringbuffer字节大小必须为2的N次方能将求模运算转为位运算提高效率否则将影响效率 int bufferSize 1024 * 256;//单线程模式获取额外的性能 DisruptorMessageModel disruptor new Disruptor(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者 disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程 disruptor.start();//获取ringbuffer环用于接取生产者生产的事件 RingBufferMessageModel ringBuffer disruptor.getRingBuffer();return ringBuffer;}
}MessageModel消息实体类
Data
public class MessageModel { private String message;
}工厂类
public class HelloEventFactory implements EventFactoryMessageModel {Override public MessageModel newInstance() { return new MessageModel(); }
} 消息处理器
Slf4j
public class HelloEventHandler implements EventHandlerMessageModel {Override public void onEvent(MessageModel event, long sequence, boolean endOfBatch) { try { log.info(消费者处理消息开始); if (event ! null) { log.info(消费者消费的信息是{},event); } } catch (Exception e) { log.info(消费者处理消息失败); } log.info(消费者处理消息结束); }
} 消息发送
Slf4j
Service
public class DisruptorMqServiceImpl implements DisruptorMqService { Autowiredprivate RingBufferMessageModel messageModelRingBuffer;Override public void sayHelloMq(String message) { log.info(record the message: {},message); //获取下一个Event槽的下标 long sequence messageModelRingBuffer.next(); try { //给Event填充数据 MessageModel event messageModelRingBuffer.get(sequence); event.setMessage(message); log.info(往消息队列中添加消息{}, event); } catch (Exception e) { log.error(failed to add event to messageModelRingBuffer for : e {},{},e,e.getMessage()); } finally { //发布Event激活观察者去消费将sequence传递给改消费者 //注意最后的publish方法必须放在finally中以确保必须得到调用如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer messageModelRingBuffer.publish(sequence); } }
}