LMAX Disruptor——一个高性能、低延迟且简单的框架

简介:

原文地址:LMAX Disruptor – High Performance, Low Latency and Simple Too 翻译:杨帆 校对:丁一

Disruptor是一个用于在线程间通信的高效低延时的消息组件,它像个增强的队列,并且它是让LMAX Exchange跑的如此之快的一个关键创新。关于什么是Disruptor、为何它很重要以及它的工作原理方面的信息都呈爆炸性增长 —— 这些文章很适合开始学习Disruptor,还可跟着LMAX BLOG深入学习。这里还有一份更详细的白皮书

虽然disruptor模式使用起来很简单,但是建立多个消费者以及它们之间的依赖关系需要的样板代码太多了。为了能快速又简单适用于99%的场景,我为Disruptor模式准备了一个简单的领域特定语言。例如,为建立一个消费者的“四边形模式”:

(从Trisha Gee’s excellent series explaining the disruptor pattern偷来的图片)

在这种情况下,只要生产者(P1)将元素放到ring buffer上,消费者C1和C2就可以并行处理这些元素。但是消费者C3必须一直等到C1和C2处理完之后,才可以处理。在现实世界中的对应的案例就像:在处理实际的业务逻辑(C3)之前,需要校验数据(C1),以及将数据写入磁盘(C2)。

用原生的Disruptor语法来创建这些消费者的话代码如下:

01 Executor executor = Executors.newCachedThreadPool();
02 BatchHandler handler1 = new MyBatchHandler1();
03 BatchHandler handler2 = new MyBatchHandler2();
04 BatchHandler handler3 = new MyBatchHandler3()
05 RingBuffer ringBuffer = new RingBuffer(ENTRY_FACTORY, RING_BUFFER_SIZE);
06 ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier();
07 BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1);
08 BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2);
09 ConsumerBarrier consumerBarrier2 =
10 ringBuffer.createConsumerBarrier(consumer1, consumer2);
11 BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3);
12 executor.execute(consumer1);
13 executor.execute(consumer2);
14 executor.execute(consumer3);
15 ProducerBarrier producerBarrier =
16 ringBuffer.createProducerBarrier(consumer3);

在以上这段代码中,我们不得不创建那些个handler(就是那些个MyBatchHandler实例),外加消费者屏障,BatchConsumer实例,然后在他们各自的线程中处理这些消费者。DSL能帮我们完成很多创建工作,最终的结果如下:

1 Executor executor = Executors.newCachedThreadPool();
2 BatchHandler handler1 = new MyBatchHandler1();
3 BatchHandler handler2 = new MyBatchHandler2();
4 BatchHandler handler3 = new MyBatchHandler3();
5 DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY,
6     RING_BUFFER_SIZE, executor);
7 dw.consumeWith(handler1, handler2).then(handler3);
8 ProducerBarrier producerBarrier = dw.createProducerBarrier();

我们甚至可以在一个更复杂的六边形模式中构建一个并行消费者链:

1 dw.consumeWith(handler1a, handler2a);
2 dw.after(handler1a).consumeWith(handler1b);
3 dw.after(handler2a).consumeWith(handler2b);
4 dw.after(handler1b, handler2b).consumeWith(handler3);
5 ProducerBarrier producerBarrier = dw.createProducerBarrier();

这个领域特定语言刚刚诞生不久,欢迎任何反馈,也欢迎大家从github上fork并改进它。 

目录
相关文章
|
6月前
|
负载均衡 监控 Java
异步编程 - 14 异步、分布式、基于消息驱动的框架 Akka
异步编程 - 14 异步、分布式、基于消息驱动的框架 Akka
101 0
|
8月前
|
消息中间件 存储 Kafka
kafka是如何实现高性能高吞吐的?
以下是某网站上对该问题的总结,一共分为了以下六点,但这上面说的很浅显,我在后面加了一些自己的理解,做为解释,如有遗漏或者不对的地方欢迎大家指点,我会即时的修改,辛苦诸位老铁!
110 0
|
5月前
|
Java 调度
基于Disruptor游戏服务器消息总线的设计
基于Disruptor游戏服务器消息总线的设计
28 0
|
6月前
|
存储 缓存 算法
异步编程 - 13 高性能线程间消息传递库 Disruptor
异步编程 - 13 高性能线程间消息传递库 Disruptor
55 0
高性能无锁并发框架Disruptor,太强了
Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单
|
缓存 安全 算法
高性能无锁并发框架Disruptor,太强了!
Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单官网:lmax-exchange.github.io/disruptor/目前,包括Apache Storm、Camel、Log4j2在内的很多知名项目都应用了Disruptor以获取高性能为什么会产生Disruptor框架「目前Java内置队列保证线程安全的方式:」ArrayBlockingQueue:基于数组形式的队列,通过加锁的方式,来保证多线程情况下数据的安全;LinkedBlockingQue基于链表形式
|
消息中间件 存储 Java
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
187 0
【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
|
存储 监控 Linux
Netty如何做到单机百万并发?(二)
Netty如何做到单机百万并发?(二)
Netty如何做到单机百万并发?(二)
|
监控 搜索推荐 Linux
Netty如何做到单机百万并发?(三)
Netty如何做到单机百万并发?(三)
Netty如何做到单机百万并发?(三)