Java并发编程框架Disruptor

简介:

Disruptor是什么?

Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后在交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧。


轮胎:RingBuffer

RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:

wKioL1heKq_wpmu8AAA9zSBGoCQ743.png

数组

这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。


序号

RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。

由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。


无锁的机制

在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:

一个生产者 + 一个消费者

生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

一个生产者 + 多个消费者

多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

多个生产者 + N个消费者

很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。



Disruptor初体验:简单的生产者和消费者


业务数据对象POJO(Event)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public  class  Order {
 
     //订单ID
     private  long  id;
 
     //订单信息
     private  String info;
 
     //订单价格
     private  double  price;
 
     public  long  getId() {
         return  id;
     }
 
     public  void  setId( long  id) {
         this .id = id;
     }
 
     public  String getInfo() {
         return  info;
     }
 
     public  void  setInfo(String info) {
         this .info = info;
     }
 
     public  double  getPrice() {
         return  price;
     }
 
     public  void  setPrice( double  price) {
         this .price = price;
     }
}


业务数据工厂(Factory)

1
2
3
4
5
6
7
8
9
10
public  class  OrderFactory  implements  EventFactory{
 
     @Override
     public  Object newInstance() {
 
         System.out.println( "OrderFactory.newInstance" );
         return  new  Order();
     }
 
}

事件处理器(Handler,即消费者处理逻辑)

1
2
3
4
5
6
7
8
9
10
11
public  class  OrderHandler  implements  EventHandler<Order>{
 
     @Override
     public  void  onEvent(Order order,  long  l,  boolean  b)  throws  Exception {
 
         System.out.println(Thread.currentThread().getName() +  " 消费者处理中:"  + l);
         order.setInfo( "info"  + order.getId());
         order.setPrice(Math.random());
     }
 
}

Main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public  class  Main {
 
     public  static  void  main(String[] args)  throws  InterruptedException {
 
         //创建订单工厂
         OrderFactory orderFactory =  new  OrderFactory();
 
         //ringbuffer的大小
         int  RINGBUFFER_SIZE =  1024 ;
 
         //创建disruptor
         Disruptor<Order> disruptor =  new  Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
 
         //设置事件处理器 即消费者
         disruptor.handleEventsWith( new  OrderHandler());
 
         disruptor.start();
 
         RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
 
         //-------------生产数据
         for ( int  i =  0  ; i <  3  ; i++){
 
             long  sequence = ringBuffer.next();
 
             Order order = ringBuffer.get(sequence);
 
             order.setId(i);
 
             ringBuffer.publish(sequence);
             System.out.println(Thread.currentThread().getName() +  " 生产者发布一条数据:"  + sequence +  " 订单ID:"  + i);
         }
 
         Thread.sleep( 1000 );
 
         disruptor.shutdown();
     }
 
}

运行结果:

wKiom1heUkvDVrbbAABB1RkRt-k131.png


说明:

其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

另外在构造Disruptor的时候,在3.3.6之前使用的是API:

wKioL1heU5KD4yc1AABMgNq5GkY611.png

到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

wKioL1heVOLj98krAABdMOQG3ug461.png



单独使用RingBuffer:WorkerPool

如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public  static  void  main(String[] args)  throws  InterruptedException {
 
     ExecutorService executor = Executors.newFixedThreadPool( 3 );
     RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE, new  OrderFactory(), 1024 , new  YieldingWaitStrategy());
     WorkerPool<Order> workerPool =  new  WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(), new  IgnoreExceptionHandler(), new  OrderHandler());
 
     workerPool.start(executor);
 
     //-------------生产数据
     for ( int  i =  0  ; i <  30  ; i++){
 
         long  sequence = ringBuffer.next();
 
         Order order = ringBuffer.get(sequence);
         order.setId(i);
 
         ringBuffer.publish(sequence);
 
         System.out.println(Thread.currentThread().getName() +  " 生产者发布一条数据:"  + sequence +  " 订单ID:"  + i);
     }
 
     Thread.sleep( 1000 );
     
     workerPool.halt();
     executor.shutdown();
 
}

实际上是利用WorkerPool辅助连接消费者。



一个生产者+多个消费者

wKioL1heZFugNx-wAABXx8iDTG8613.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public  static  void  main(String[] args)  throws  InterruptedException {
 
     //创建订单工厂
     OrderFactory orderFactory =  new  OrderFactory();
 
     //ringbuffer的大小
     int  RINGBUFFER_SIZE =  1024 ;
 
     //创建disruptor
     Disruptor<Order> disruptor =  new  Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
 
     //设置事件处理器 即消费者
     EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith( new  OrderHandler(), new  OrderHandler2());
     eventHandlerGroup.then( new  OrderHandler3());
     disruptor.start();
 
     RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
 
     //-------------生产数据
     for ( int  i =  0  ; i <  3  ; i++){
 
         long  sequence = ringBuffer.next();
 
         Order order = ringBuffer.get(sequence);
 
         order.setId(i);
 
         ringBuffer.publish(sequence);
         System.out.println(Thread.currentThread().getName() +  " 生产者发布一条数据:"  + sequence +  " 订单ID:"  + i);
     }
 
     Thread.sleep( 1000 );
     disruptor.shutdown();
}


运行结果:

wKiom1heZJniEbQbAAA1jfoPs8I081.png

生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。


如果我们想顺序的按照A->B->C呢?

1
2
3
         disruptor.handleEventsWith( new  Handler1()).
             handleEventsWith( new  Handler2()).
             handleEventsWith( new  Handler3());


如果我们想六边形操作呢?

wKioL1heZe3QoLB9AACQcTxdeJ4198.png

1
2
3
4
5
6
7
8
9
         Handler1 h1 =  new  Handler1();
         Handler2 h2 =  new  Handler2();
         Handler3 h3 =  new  Handler3();
         Handler4 h4 =  new  Handler4();
         Handler5 h5 =  new  Handler5();
         disruptor.handleEventsWith(h1, h2);
         disruptor.after(h1).handleEventsWith(h4);
         disruptor.after(h2).handleEventsWith(h5);
         disruptor.after(h4, h5).handleEventsWith(h3);


到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。


本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1885830,如需转载请自行联系原作者
相关文章
|
1天前
|
Java
Java中的并发编程:理解和应用线程池
【4月更文挑战第23天】在现代的Java应用程序中,性能和资源的有效利用已经成为了一个重要的考量因素。并发编程是提高应用程序性能的关键手段之一,而线程池则是实现高效并发的重要工具。本文将深入探讨Java中的线程池,包括其基本原理、优势、以及如何在实际开发中有效地使用线程池。我们将通过实例和代码片段,帮助读者理解线程池的概念,并学习如何在Java应用中合理地使用线程池。
|
5天前
|
Java Maven 开发工具
《Java 简易速速上手小册》第5章:Java 开发工具和框架(2024 最新版)
《Java 简易速速上手小册》第5章:Java 开发工具和框架(2024 最新版)
26 1
|
5天前
|
IDE Java 物联网
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
13 0
|
5天前
|
安全 Java 开发者
Java并发编程:深入理解Synchronized关键字
【4月更文挑战第19天】 在Java多线程编程中,为了确保数据的一致性和线程安全,我们经常需要使用到同步机制。其中,`synchronized`关键字是最为常见的一种方式,它能够保证在同一时刻只有一个线程可以访问某个对象的特定代码段。本文将深入探讨`synchronized`关键字的原理、用法以及性能影响,并通过具体示例来展示如何在Java程序中有效地应用这一技术。
|
6天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
6天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。
|
7天前
|
缓存 分布式计算 监控
Java并发编程:深入理解线程池
【4月更文挑战第17天】在Java并发编程中,线程池是一种非常重要的技术,它可以有效地管理和控制线程的执行,提高系统的性能和稳定性。本文将深入探讨Java线程池的工作原理,使用方法以及在实际开发中的应用场景,帮助读者更好地理解和使用Java线程池。
|
7天前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【4月更文挑战第17天】本文探讨了Java中的CompletableFuture和反应式编程在提升异步编程体验上的作用。CompletableFuture作为Java 8引入的Future扩展,提供了一套流畅的链式API,简化异步操作,如示例所示的非阻塞数据库查询。反应式编程则关注数据流和变化传播,通过Reactor等框架实现高度响应的异步处理。两者结合,如将CompletableFuture转换为Mono或Flux,可以兼顾灵活性和资源管理,适应现代高并发环境的需求。开发者可按需选择和整合这两种技术,优化系统性能和响应能力。
|
8天前
|
缓存 监控 Java
Java并发编程:线程池与任务调度
【4月更文挑战第16天】Java并发编程中,线程池和任务调度是核心概念,能提升系统性能和响应速度。线程池通过重用线程减少创建销毁开销,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。任务调度允许立即或延迟执行任务,具有灵活性。最佳实践包括合理配置线程池大小、避免过度使用线程、及时关闭线程池和处理异常。掌握这些能有效管理并发任务,避免性能瓶颈。
|
9天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。