Java7中的ForkJoin并发框架初探(中)

简介:

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这样5个类。本文就对JDK1.7中增加这5个工具类实现做简要分析。

0. JDK中ForkJoin实现概述

在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类:

  • ForkJoinPool 实现ForkJoin的线程池

  • ForkJoinWorkerThread  实现ForkJoin的线程

  • ForkJoinTask<V> 一个描述ForkJoin的抽象类

  • RecursiveAction 无返回结果的ForkJoinTask实现

  • RecursiveTask<V> 有返回结果的ForkJoinTask实现

ForkJoinPool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工作窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。需要和线程紧密配合。

而ForkJoinWorkerThread则继承了java.lang.Thread类,维护了线程自己的队列,同一个任务fork()操作原则上会添加到同一个线程队列中。而这个线程类需要和ForkJoinPool紧密合作,有指向对应ForkJoinPool对象的引用。

ForkJoinTask则实现了Future接口,除了对接口的实现外,主要是fork()和join()操作。注意,貌似fork()只有ForkJoinWorkerThread 中才能执行。

两个子类RecursiveAction和RecursiveTask则实现比较简单,区别就在于返回值的处理不同。

1. ForkJoinPool

ForkJoinPool是实现了 Fork Join 的线程池。看JDK源码我们知道ForkJoinPool是extends AbstractExecutorService的,也就是说间接地实现了Executor和ExecutorService接口。实际上也就意味着ForkJoinPool是继ThreadPoolExecutor后的又一个Executor(Service)的具体实现。

1.1. 构建初始化

我们先看ForkJoinPool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,通常我们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。我们再来看3个参数的构造方法实现。其中:

  • int parallelism 第一个参数是并行度,这个参数简介影响着(会额外做一些运算)这个ForkJoinPool的ForkJoinWorkerThread 线程数。默认情况下,这个参数是任务运行环境的处理器个数,比如系统提供的处理器数目为4,初始化线程池会开启16个线程。

  • ForkJoinWorkerThreadFactory factory 这个是ForkJoinPool构建新线程ForkJoinWorkerThread 对象的工厂,类似于ThreadPoolExecutor中用到的ThreadFactory。

  • Thread.UncaughtExceptionHandler handler 这个前面并发的文章页提到过,是线程异常处理器,这里不多说了。

1.2. 任务提交

前面已经提到,ForkJoinPool也是Executor(Service)的实现,那么execute()和submit()这样向ThreadPoolExecutor提交任务的方法对于ForkJoinPool来说也是一样有效的。

需要说明的是,除了增加支持ForkJoinTask对象参数的重载实现外,还在Runnable和Callable参数的方法中对原始的Runnable和Callable对象做了到ForkJoinTask的适配,使用的分别是ForkJoinTask的静态内部类AdaptedRunnable和AdaptedCallable的对象。而这两个类型参数对应的方法最终都会调用ForkJoinTask参数的方法:

1
2
3
4
5
6
public  <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
     if  (task ==  null )
         throw  new  NullPointerException();
     forkOrSubmit(task);
     return  task;
}

我们接下来再看下任务提交中被调用到的forkOrSubmit()方法:

1
2
3
4
5
6
7
8
9
10
11
private  <T>  void  forkOrSubmit(ForkJoinTask<T> task) {
     ForkJoinWorkerThread w;
     Thread t = Thread.currentThread();
     if  (shutdown)
         throw  new  RejectedExecutionException();
     if  ((t  instanceof  ForkJoinWorkerThread) &&
         (w = (ForkJoinWorkerThread)t).pool ==  this )
         w.pushTask(task);
     else
         addSubmission(task);
}

逻辑很容易理解,先判断ForkJoinPool的状态,若已停止,则抛异常返回。之后如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。

1.3. 线程的启动和工作

前面已经强调过,ForkJoinPool和ForkJoinWorkerThread是紧密相关,耦合在一起的。Thread的start()会调用run(),而ForkJoinWorkerThread类重写了run()方法,会调用对应的线程池ForkJoinPool对象的work()方法。

我们来看一下work()方法的实现。

1
2
3
4
5
6
7
8
9
10
11
final  void  work(ForkJoinWorkerThread w) {
     boolean  swept =  false ;                 // true on empty scans
     long  c;
     while  (!w.terminate && ( int )(c = ctl) >=  0 ) {
         int  a;                             // active count
         if  (!swept && (a = ( int )(c >> AC_SHIFT)) <=  0 )
             swept = scan(w, a);
         else  if  (tryAwaitWork(w, c))
             swept =  false ;
     }
}

里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,如果能够根据scan()方法得到任务,并执行,否则进入阻塞状态。

我们来看一下scan()方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private  boolean  scan(ForkJoinWorkerThread w,  int  a) {
     int  g = scanGuard;  // mask 0 avoids useless scans if only one active
     int  m = (parallelism ==  1  - a && blockedCount ==  0 ) ?  0  : g & SMASK;
     ForkJoinWorkerThread[] ws = workers;
     if  (ws ==  null  || ws.length <= m)          // staleness check
         return  false ;
     for  ( int  r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
         ForkJoinTask<?> t; ForkJoinTask<?>[] q;  int  b, i;
         ForkJoinWorkerThread v = ws[k & m];
         if  (v !=  null  && (b = v.queueBase) != v.queueTop &&
             (q = v.queue) !=  null  && (i = (q.length -  1 ) & b) >=  0 ) {
             long  u = (i << ASHIFT) + ABASE;
             if  ((t = q[i]) !=  null  && v.queueBase == b &&
                 UNSAFE.compareAndSwapObject(q, u, t,  null )) {
                 int  d = (v.queueBase = b +  1 ) - v.queueTop;
                 v.stealHint = w.poolIndex;
                 if  (d !=  0 )
                     signalWork();              // propagate if nonempty
                 w.execTask(t);
             }
             r ^= r <<  13 ; r ^= r >>>  17 ; w.seed = r ^ (r <<  5 );
             return  false ;                      // store next seed
         }
         else  if  (j <  0 ) {                      // xorshift
             r ^= r <<  13 ; r ^= r >>>  17 ; k = r ^= r <<  5 ;
         }
         else
             ++k;
     }
     if  (scanGuard != g)                        // staleness check
         return  false ;
     else  {                                     // try to take submission
         ForkJoinTask<?> t; ForkJoinTask<?>[] q;  int  b, i;
         if  ((b = queueBase) != queueTop &&
             (q = submissionQueue) !=  null  &&
             (i = (q.length -  1 ) & b) >=  0 ) {
             long  u = (i << ASHIFT) + ABASE;
             if  ((t = q[i]) !=  null  && queueBase == b &&
                 UNSAFE.compareAndSwapObject(q, u, t,  null )) {
                 queueBase = b +  1 ;
                 w.execTask(t);
             }
             return  false ;
         }
         return  true ;                          // all queues empty
     }
}

看起来很复杂,实际的原理则很简单,就是先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

  • 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread的execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。

  • 之后会尝试做任务窃取,尝试从其他线程中获取任务

  • 任务窃取条件不满足时,到提交队列中获取提交的任务

1.4. ForkJoinPool的其它属性

除了上述提到的操作,ForkJoin中还维护了

  • 线程数组和提交任务的队列,这是最基本的

  • 操作相关的锁和条件对象

  • volatile long ctl; 等线程池ForkJoinPool状态的属性

  • static final Random workerSeedGenerator; 等和任务窃取策略相关的一系列属性

  •  private volatile long stealCount; 等数据统计相关属性

等数据属性。

2. ForkJoinWorkerThread

ForkJoinWorkerThread扩展于Thread类,但提供了很多支持ForkJoin的特性。

上文在介绍ForkJoinPool的时候已经对这个类做了很多描述,也强调过线程类ForkJoinWorkerThread和ForkJoinPool相互依赖,放在一起才有意义。实际上,还要提到描述Fork Join任务的类ForkJoinTask。

除了上面提到的以外,对于ForkJoinWorkerThread这个类,再稍微提一下这样几个点:

  • ForkJoinTask<?>[] queue; 这是维护和ForkJoin相关的(子)任务队列,还有queueTop和queueBase属性,分别标记队列的尾部和头部

  • final ForkJoinPool pool; 指向线程池的引用,需要注意的是,这个属性被final修饰

  • 和ForkJoinTask的fork()和join()方法相关的方法——pushTask()和unpushTask(),分别负责在当前ForkJoinWorkerThread对象维护的队列中新增和取回任务

  • 其它与状态和统计相关的属性

3. ForkJoinTask及两个抽象子类

ForkJoinTask是ForkJoin框架中的主体,是ForkJoin中任务的体现。这个类实现了Future和Serializable接口。除了Futrue接口要满足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。

对于fork(),这个也许大家都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程ForkJoinWorkerThread对象维护的队列中加入新的子任务。实现如下:

public final ForkJoinTask fork() {

    ((ForkJoinWorkerThread) Thread.currentThread())

        .pushTask(this);

    return this;

}

需要注意的是fork()方法的调用是在当前线程对象为ForkJoinWorkerThread的条件下。

我们再来看看对应的join()实现:

1
2
3
4
5
6
public  final  V join() {
     if  (doJoin() != NORMAL)
         return  reportResult();
     else
         return  getRawResult();
}

显然,它有调用了doJoin()方法,我们再来深入了解下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private  int  doJoin() {
     Thread t; ForkJoinWorkerThread w;  int  s;  boolean  completed;
     if  ((t = Thread.currentThread())  instanceof  ForkJoinWorkerThread) {
         if  ((s = status) <  0 )
             return  s;
         if  ((w = (ForkJoinWorkerThread)t).unpushTask( this )) {
             try  {
                 completed = exec();
             catch  (Throwable rex) {
                 return  setExceptionalCompletion(rex);
             }
             if  (completed)
                 return  setCompletion(NORMAL);
         }
         return  w.joinTask( this );
     }
     else
         return  externalAwaitDone();
}

大概的逻辑是这样的,在当前线程对象为ForkJoinWorkerThread的条件下,从队列中取回当前任务ForkJoinTask对象,并尝试在调用线程对其直接执行,否则当前线程调用wait()阻塞等待。更深入的理解可续继续查阅源码。

最后,我们再来看看exec()方法,这个是在ForkJoinTask中是没有给出实现的。

在JDK中,有ForkJoinTask的两个抽象子类RecursiveAction和RecursiveTask,他们分别给出了exec()的实现,这也是这两个子类主要做的事情,实际上是调用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未给出实现的。

实际上,compute()方法就是Fork Join要执行的内容,是Fork Join任务的实质,需要开发者给出。

而RecursiveAction和RecursiveTask就是方便开发者使用Fork Join的,RecursiveAction和RecursiveTask这两个类的区别仅仅是返回结果的情况不同。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。

特别说明:尊重作者的劳动成果,转载请注明出处哦~~~http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp85
相关文章
|
3月前
|
算法 Java 程序员
论文翻译 | 【深入挖掘Java技术】「底层原理专题」深入分析一下并发编程之父Doug Lea的纽约州立大学的ForkJoin框架的本质和原理
本文深入探讨了一个Java框架的设计、实现及其性能。该框架遵循并行编程的理念,通过递归方式将问题分解为多个子任务,并利用工作窃取技术进行并行处理。所有子任务完成后,其结果被整合以形成完整的并行程序。 在总体设计上,该框架借鉴了Cilk工作窃取框架的核心理念。其核心技术主要聚焦于高效的任务队列构建和管理,以及工作线程的管理。经过实际性能测试,我们发现大多数程序的并行加速效果显著,但仍有优化空间,未来可能需要进一步研究改进方案。
48 3
论文翻译 | 【深入挖掘Java技术】「底层原理专题」深入分析一下并发编程之父Doug Lea的纽约州立大学的ForkJoin框架的本质和原理
|
Java
详解java中一个分而治之的框架ForkJoin
在古代,皇帝要想办成一件事肯定不会自己亲自去动手,而是把任务细分发给下面的大臣,下面的大臣也懒呀,于是把任务继续分成几个部分,继续下发,于是到了最后最终负责的人就完成了一个小功能。上面的领导再把这些结果一层一层汇总,最终返回给皇帝。这就是分而治之的思想,也是我们今天的主题ForkJoin。
178 0
详解java中一个分而治之的框架ForkJoin
|
运维 Java 大数据
Java并发JUC(java.util.concurrent)ForkJoin/异步回调
Java并发JUC(java.util.concurrent)ForkJoin/异步回调
Java并发JUC(java.util.concurrent)ForkJoin/异步回调
|
Java 并行计算 分布式计算
Java并发-ForkJoin
主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。 public class ForkJoinExample extends RecursiveTask { private final int ...
839 0
|
安全 Java
Java多线程编程简明教程(2) - ForkJoin模式
Future实现了从单任务到多任务的转变,而Fork-Join模式是一种充分利用多核的模式。
5545 0
|
1天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
5 0
|
1天前
|
并行计算 算法 安全
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程