Java源码分析之CountDownLatch

  1. 云栖社区>
  2. 博客>
  3. 正文

Java源码分析之CountDownLatch

柏辰爸爸 2016-12-10 12:26:00 浏览588
展开阅读全文

        一、CountDownLatch介绍

       CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。await()方法阻塞,直到由于调用countDown()方法,当前count值达到0,之后所有等待线程被释放,而任何后续await()方法的调用会立即返回。这个是只有一次的现场,即count值无法被重设。如果你需要一个能够重设count值的版本,不妨考虑使用CyclicBarrier。

        二、CountDownLatch应用

        CountDownLatch是一个通用的同步工具,可用于许多目的。一个用count值为1来初始化的CountDownLatch可用作一个开关或者门闩:所有的线程调用await()方法等待一个线程调用countDown()方法后把门打开。一个用count值为N来初始化的CountDownLatch可用作使得一个线程等待,直到N个线程完成各自的事情,或者一些action被完成N次等。CountDownLatch一个有用的特性是:它不需要线程调用countDown()方法等待计数达到零在继续之前,它只是阻止任何线程继续过去一个等待,直到所有线程可以通过。

        1、示例应用一

        这里有一对类,一组工作线程使用两个countdown latches的示例:

        第一个是启动信号,阻止worker线程工作直到driver准备好;

        第二个是完成信号,允许driver等到所有的workers工作完成。

       代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

	class Driver {
		void main() throws InterruptedException {
			CountDownLatch startSignal = new CountDownLatch(1);
			CountDownLatch doneSignal = new CountDownLatch(20);

			for (int i = 0; i < 20; ++i) // create and start threads
				new Thread(new Worker(startSignal, doneSignal)).start();

			doSomethingElse(); // don't let run yet
			startSignal.countDown(); // let all threads proceed
			doSomethingElse();
			doneSignal.await(); // wait for all to finish
		}

		void doSomethingElse() {
			// ...
		}
	}

	class Worker implements Runnable {
		private final CountDownLatch startSignal;
		private final CountDownLatch doneSignal;

		Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
			this.startSignal = startSignal;
			this.doneSignal = doneSignal;
		}

		public void run() {
			try {
				startSignal.await();
				doWork();
				doneSignal.countDown();
			} catch (InterruptedException ex) {
			} // return;
		}

		void doWork() {
			// ...
		}
	}
}

        2、示例应用二

        另一个典型用法是将一个问题分成n份,在一个线程中定义并执行一份,并在latch中count down,然后将所有的线程放入一个队列。当所有的部分完成,协调线程将会通过await()方法,继续处理。当线程必须以这种方式反复count down时,使用CyclicBarrier。

        代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class TestCountDownLatch2 {

	class Driver2 { // ...
		void main() throws InterruptedException {
			CountDownLatch doneSignal = new CountDownLatch(20);
			Executor e = Executors.newFixedThreadPool(20);

			for (int i = 0; i < 20; ++i) // create and start threads
				e.execute(new WorkerRunnable(doneSignal, i));

			doneSignal.await(); // wait for all to finish
		}
	}

	class WorkerRunnable implements Runnable {
		private final CountDownLatch doneSignal;
		private final int i;

		WorkerRunnable(CountDownLatch doneSignal, int i) {
			this.doneSignal = doneSignal;
			this.i = i;
		}

		public void run() {
			doWork(i);
			doneSignal.countDown();
		}

		void doWork(int i) {
			// ...
		}
	}
}
        三、CountDownLatch实现分析

        1、Sync

        在CountDownLatch内部,有一个Sync的同步器,它继承自java.util.concurrent包中各种同步工具共用的AbstractQueuedSynchronizer,其实现如下:

    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
        关于AbstractQueuedSynchronizer,有其它的文章进行专门的介绍。这里只分析下Sync的实现。其有一个需要入参int count的构造函数,设置AbstractQueuedSynchronizer的state。并覆写了tryAcquireShared()和tryReleaseShared()方法,其中tryReleaseShared()方法用于CountDownLatch的countDown()方法,这个tryReleaseShared()方法的逻辑如下:

        在一个for循环内,首先通过getState()获取state值,如果为0,直接返回false,否则取state-1,并尝试CAS操作,修改state状态,并且state等于0,返回true,否则返回false。

        tryAcquireShared()方法更简单,判断state(即count值),如果等于0,返回1,否则返回-1.

        2、countDown()

        countDown()方法的实现很简单,如下:

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }
        其核心处理就是减少latch中count值,如果cout值为0,释放所有的等待线程。它调用的是sync的releaseShared()方法,而这个方法是在AbstractQueuedSynchronizer中实现的,如下:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
        先调用tryReleaseShared()方法,即上述Sync的同名方法,并且如果返回true的话,继续调用doReleaseShared()方法,返回true,否则返回false。即如果修改后state(即count)值为正,不做其他处理,否则调用doReleaseShared()方法。

        3、await()

         await()方法的核心作用是,让当前线程阻塞,直到latch的count值更改为0,或者当前线程被interrupted。如果count值为0,则await()方法直接返回。代码如下:

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        还是借助的sync,调用的其acquireSharedInterruptibly()方法,这个方法是在Sync的父类中实现的,代码如下:

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        先调用Sync的tryAcquireShared()方法,如果返回值为负值,则调用doAcquireSharedInterruptibly()方法。上面讲到了,如果state(即count)值为0,则返回1,方法直接返回,否则进入doAcquireSharedInterruptibly()方法,实现阻塞。

        doReleaseShared()和doAcquireSharedInterruptibly()方法的介绍参见AbstractQueuedSynchronizer的分析文章。






网友评论

登录后评论
0/500
评论
柏辰爸爸
+ 关注