JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

简介:

JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

概述
CountDownLatch和CyclicBarrier有着相似之处,并且也常常有人将他们拿出来进行比较,这次,笔者试着从源码的角度分别解析这两个类,并且从源码的角度出发,看看两个类的不同之处。

CountDownLatch
CountDownLatch从字面上来看是一个计数工具类,实际上这个类是用来进行多线程计数的JAVA方法。

CountDownLatch内部的实现主要是依靠AQS的共享模式。当一个线程把CountDownLatch初始化了一个count之后,其他的线程调用await就会阻塞住,直到其他的线程一个一个调用countDown方法进行release操作,把count的值减到0,即把同步锁释放掉,await才会进行下去。

Sync
内部主要还是实现了一个继承自AQS的同步器Sync。Sync源码如下:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 构造方法,参数是count的数值
    Sync(int count) {
        // 内部使用state来存储count
        setState(count);
    }

    // 获取count的值
    int getCount() {
        return getState();
    }

    // 尝试获取分享模式同步器
    protected int tryAcquireShared(int acquires) {
        // 判断state的值,如果为0则获取成功,否则获取失败
        // 继承自AQS,根据AQS中的注释我们可以知道如果返回结果
        // 大于0则说明获取成功,如果小于0则说明获取失败
        // 此处不会返回0,因为没有意义
        return (getState() == 0) ? 1 : -1;
    }

    // 释放同步器
    protected boolean tryReleaseShared(int releases) {
        // 自选操作
        for (;;) {
            // 获取state
            int c = getState();
            // 如果state为0,直接返回false
            if (c == 0)
                return false;
            // 计算state-1的结果
            int nextc = c-1;
            // CAS操作将这个值同步到state上
            if (compareAndSetState(c, nextc))
                // 如果同步成功,则判断是否此时state为0
                return nextc == 0;
        }
    }
}

Sync是继承自AQS的同步器,这段代码中值得拿出来讨论的有以下几点:

为什么用state来存储count的数值?
因为state和count其实上是一个概念,当state为0的时候说明资源是空闲的,当count为0时,说明所有的CountDownLatch线程都已经完成,所以两者虽然说不是同样的意义,但是在代码实现层面的表现是完全一致的,因此可以将count记录在state中。

为什么tryAcquireShared不会返回0?
首先需要解释下tryAcquireShared在AQS中可能的返回值:负数说明是不可以获取共享锁,0说明是可以获取共享锁,但是当前线程获取后已经把所有的共享锁资源占完了,接下来的线程将不会再有多余资源可以获取了,正数则说明了你可以获取共享锁,并且之后还有余量可以给其他线程提供共享锁。然后我们回过来看CountDownLatch内部的tryAcquireShared,我们在实现上完全不关注后续线程,后续的资源占用状况,我只要当前状态,那么这个0的返回值实际上是没有必要的。

为什么tryReleaseShared中的参数不被使用到?
根据这个类的实现方式,我们可以知道tryReleaseShared的参数一定是1,因为线程的完成一定是一个一个倒数完成的。实际上我们去看countDown方法内部调用到了sync.releaseShared方法的时候可以发现他写死了参数为1,所以实际上tryReleaseShared中的参数不被使用到的原因是因为参数值固定为1.

构造函数和方法

// 构造方法
public CountDownLatch(int count) {
    // count必须大于0
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化Sync
    this.sync = new Sync(count);
}
// 等待获取锁(可被打断)
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 等待获取锁(延迟)
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 计数器降低(释放同步器)
// 每次调用减少1
public void countDown() {
    sync.releaseShared(1);
}

// 获取count
public long getCount() {
    return sync.getCount();
}

// toString
public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

CyclicBarrier
CyclicBarrier从字面上看是循环栅栏,在JAVA中的作用是让所有的线程完成后进行等待,直到所有的线程全部完成,再进行接下来的操作。

CyclicBarrier并没有直接继承AQS实现同步,而是借助了可重入锁ReentrantLock以及Condition来完成自己的内部逻辑。

成员变量

// 锁
private final ReentrantLock lock = new ReentrantLock();

// 条件
private final Condition trip = lock.newCondition();

// 线程数
private final int parties;

// 执行完所有线程后执行的Runnable方法,可以为空
private final Runnable barrierCommand;

// 分组
private Generation generation = new Generation();

// 未完成的线程数
private int count;

private static class Generation {
    boolean broken = false;
}

我们可以看到成员变量中有一个很陌生的类Generation,这个是CyclicBarrier内部声明的一个static类,作用是帮助区分线程的分组分代,使得CyclicBarrier可以被复用,如果这个简单的解释不能够让你很好地理解的话可以看接下来的源码解析,通过实现来理解它的用途。

构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;


public CyclicBarrier(int parties) {
    this(parties, null);
}

很常规的构造函数,只是简单的初始化成员变量,没有特别的地方。

核心方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await是CyclicBarrier的核心方法,就是靠着这个方法来实现线程的统一规划的,其中调用的是内部实现的doWait,我们来看下代码:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 常规的加锁操作,至于为什么要用本地变量操作,
    // 可以去看下我写的另一篇ArrayBlockingQueue的相关文章
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取Generation类
        final Generation g = generation;

        // 查看generation是否是broken,如果是broken的,
        // 那说明之前可能因为某些线程中断或者是一些意外状态导致没有办法
        // 完成所有线程到达终点(tripped)的目标而只能报错
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程被外部中断需要报错,并且在内部需要将
        // generation的broken置为true来让其他线程能够感知到中断
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 将线程未完成数减1
        int index = --count;
        // 如果此时剩余线程数为0,则说明所有的线程均已完成,即到达tripped状态
        if (index == 0) {
            boolean ranAction = false;
            try {
                // 如果有预设完成后执行的方法,则执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 此时由于这一个轮回的线程已经全部完成,
                // 所以调用nextGeneration方法开启一个新的轮回
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果此时还有其他的线程未完成,则当前线程开启自旋模式
        for (;;) {
            try {
                if (!timed)
                    // 如果timed为false,trip则阻塞住直到被唤醒
                    trip.await();
                else if (nanos > 0L)
                    // 如果timed为true,则调用awaitNanos设定时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 查看generation是否是broken,如果是broken的抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果g != generation意味着generation
            // 已经被赋予了一个新的对象,这说明要么是所有线程已经完成任务开启下一个轮回,
            // 要么是已经失败了,然后开启的下一个轮回,无论是哪一种情况,都return
            if (g != generation)
                return index;

            // 如果已经超时,则强制打断
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

看完这段核心代码之后我们回头再来反思Generation的意义,我们已经可以大致的给出使用Generation的理由了:

不同于CountDownLatch的实现,CyclicBarrier采取了更加复杂的方式,原因便是因为内部涉及到了多线程之间的干预与通信,CountDownLatch不关心线程的实现与进程,他只是一个计数器,而CyclicBarrier则需要知道线程是否正常的完结,是否被中断,如果用其他的方式代价会比较高,因此,CyclicBarrier的作者通过静态内部类的方式将整个分代的状态共享于多个线程之间,保证每个线程能够获取到栅栏的状态以及能够将自身的状态更好的反馈回去。同时,这种方式便于重置,也使得CyclicBarrier可以高效的重用。至于为什么broken没有用volatile修饰,因为类的方法内部全部都上了锁,所以不会出现数据不同步的问题。

总结
CountDownLatch和CyclicBarrier从使用上来说可能会有一些相似之处,但是在我们看完源码之后我们会发现两者可以说是天差地别,实现原理,实现方式,应用场景均不相同,总结下来有以下几点:

CountDownLatch实现直接依托于AQS;CyclicBarrier则是借助了ReentrantLock以及Condition
CountDownLatch是作为计数器存在的,因此采取了讨巧的设计,源码结构清晰并且简单,同样功能也较为简单;CyclicBarrier则为了实现多线程的掌控,采用了比较复杂的设计,在代码实现上也显得比较弯弯绕绕。
由于CyclicBarrier采用的实现方式,相比一次性的CountDownLatch,CyclicBarrier可以多次重复使用
计数方式的不同:CountDownLatch采用累加计数, CyclicBarrier则使用倒数计数
原文地址https://my.oschina.net/bjwzds/blog/3534835

相关文章
|
1天前
|
消息中间件 缓存 Java
java基于云部署的SaaS医院云HIS系统源码 心理CT、B超 lis、电子病历
云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工作站等一系列常规功能,还能与公卫、PACS等各类外部系统融合,实现多层机构之间的融合管理。
30 12
|
4天前
|
人工智能 监控 Java
java互联网+智慧工地云平台SaaS源码
智慧工地以施工现场风险预知和联动预控为目标,将智能AI、传感技术、人像识别、监控、虚拟现实、物联网、5G、大数据、互联网等新一代科技信息技术植入到建筑、机械、人员穿戴设施、场地进出关口等各类设备中,实现工程管理与工程施工现场的整合
14 0
|
6天前
|
监控 Java BI
java基于云计算的SaaS医院his信息系统源码 HIS云平台源码
基于云计算技术的B/S架构的HIS系统源码,SaaS模式Java版云HIS系统,融合B/S版电子病历系统,支持电子病历四级,HIS与电子病历系统均拥有自主知识产权。
28 5
|
7天前
|
人工智能 监控 数据可视化
Java智慧工地云平台源码带APP SaaS模式 支持私有化部署和云部署
智慧工地是指应用智能技术和互联网手段对施工现场进行管理和监控的一种工地管理模式。它利用传感器、监控摄像头、人工智能、大数据等技术,实现对施工现场的实时监测、数据分析和智能决策,以提高工地的安全性、效率和质量(技术架构:微服务+Java+Spring Cloud +UniApp +MySql)。
23 4
|
9天前
|
人工智能 监控 安全
JAVA基于SaaS模式的智慧工地云平台源码(云智慧工地解决方案)
智慧工地支持多端展示(PC端、手机端、平板端)SaaS微服务架构,项目监管端,工地管理端源码
15 0
|
9天前
|
搜索推荐 前端开发 Java
java医院绩效考核管理系统项目源码
系统需要和his系统进行对接,按照设定周期,从his系统获取医院科室和医生、护士、其他人员工作量,对没有录入信息化系统的工作量,绩效考核系统设有手工录入功能(可以批量导入),对获取的数据系统按照设定的公式进行汇算,且设置审核机制,可以退回修正,系统功能强大,完全模拟医院实际绩效核算过程,且每步核算都可以进行调整和参数设置,能适应医院多种绩效核算方式。
14 0
|
10天前
|
设计模式 算法 Java
[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式
[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式
|
10天前
|
设计模式 JavaScript Java
[设计模式Java实现附plantuml源码~行为型] 对象状态及其转换——状态模式
[设计模式Java实现附plantuml源码~行为型] 对象状态及其转换——状态模式
|
10天前
|
设计模式 存储 Java
[设计模式Java实现附plantuml源码~结构型]实现对象的复用——享元模式
[设计模式Java实现附plantuml源码~结构型]实现对象的复用——享元模式
|
10天前
|
设计模式 JavaScript Java
[设计模式Java实现附plantuml源码~结构型]处理多维度变化——桥接模式
[设计模式Java实现附plantuml源码~结构型]处理多维度变化——桥接模式