Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现
性能远远高于传统的BlockingQueue容器
Disruptor使用观察者模式,主动将消息发送给消费者,而不是等消费者从队列中取,在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue
环形数组结构
为了避免垃圾回收,使用数组,数组对处理器的缓存机制更加友好
数组长度为 2^n,通过位运算,加快定位速度,下标采用递增的方式,不用担心索引溢出
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据
pom
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
LongEvent
// 声明一个Event来包含需要传递的数据
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
LongEventFactory
// Event工厂
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
LongEventHandler
// 事件消费者
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消费者:"+event.getValue());
}
}
LongEventProducer
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 获取事件队列下标位置
long sequence = ringBuffer.next();
try {
// 取出空队列
LongEvent longEvent = ringBuffer.get(sequence);
// 赋值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("生产者发送数据。。。");
// 发送数据
ringBuffer.publish(sequence);
}
}
}
Main
public class Main {
public static void main(String[] args) {
// 创建可缓存线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建工厂
EventFactory eventFactory = new LongEventFactory();
// 创建ringBufferSize
int ringBufferSize = 1024 * 1024;
// 创建disruptor
// MULTI表示可以多个生产者
Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executorService, ProducerType.MULTI, new YieldingWaitStrategy());
// 注册消费者
longEventDisruptor.handleEventsWith(new LongEventHandler());
// 启动
longEventDisruptor.start();
// 创建RingBuffer容器
RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
// 创建生产者
LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
// 指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 0; i < 100; i++) {
byteBuffer.putLong(0, i);
longEventProducer.onData(byteBuffer);
}
executorService.shutdown();
longEventDisruptor.shutdown();
}
}
它是一个环(首尾相接的环),作用是存储数据,实现不同线程之间的数据传输
RingBuffer 每块区是拥有一个序号的,这个序号指向环形数组结构的下一个可用元素
随着不断地写进了填充这个圆环,这个指针序号会不断地递增,直到绕过这个环
如果圆环满了,它会将金数据覆盖,如上图:现在12的区域的下个区域目前是3,如果有新的数据到来,那么指针往下移的时候就会把区域3的数据给覆盖变成13,框架提供了一系列帮助我们平行消费的监控,会很好的控制生产者和消费者之间的速度,从而达到生产和消费之间的平衡
RingBuffer 为什么效率高?
采用数组,数组支持索引访问
数组的内存分配是预先加载的,一但指定大小创建后,就一直存在,这也意味着不需要花大量的时间做垃圾回收,而阻塞队列采用链表实现,需要不断的删除、创建节点
Sequence:序号,声明一个序号,用于跟踪 ringbuffer 中任务的变化和消费者的消费情况
WaitStrategy:有多种实现,用以表示当无可消费事件时,消费者的等待策略
Event:消费事件
EventProcessor:事件处理器,监听 RingBuffer 的事件,并消费可用事件,从 RingBuffer 读取的事件会交由实际的生产者实现类来消费,它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好
EventHandler:业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口,代表着消费者
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。