java源码-ThreadPoolExecutor(2)

简介: 开篇 这篇文章的主要目标是为了讲解清楚ThreadPoolExecutor的提交任务的过程,非常推荐静下心来仔细阅读。java源码-ThreadPoolExecutor(1)java源码-ThreadPoolExecutor(2)java源码-Thr...

开篇

 这篇文章的主要目标是为了讲解清楚ThreadPoolExecutor的提交任务的过程,非常推荐静下心来仔细阅读。
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)


ThreadPoolExecutor状态介绍

 ThreadPoolExecutor针对线程池一共维护了五种状态,实现上用用高3位表示ThreadPoolExecutor的执行状态,低29位维持线程池线程个数,分别是:

  • RUNNING = -1 << COUNT_BITS = -1<<29 高三位为111
  • SHUTDOWN = 0 << COUNT_BITS = 0<<29 高三位为000
  • STOP = 1 << COUNT_BITS = 1<<29 高三位为001
  • TIDYING = 2 << COUNT_BITS = 2<<29 高三位为010
  • TERMINATED = 3 << COUNT_BITS = 3<<29 高三位为011
public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Integer.SIZE=32,Integer.SIZE-3=29,COUNT_BITS=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程池最大线程数=536870911(2^29-1),CAPACITY二进制中低29为为1,高3位为0
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 用高3位表示ThreadPoolExecutor的执行状态
    // RUNNING=111
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN=000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP=001
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING=010
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED=110
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    
    // runStateOf通过获取高3位来对比
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // workerCountOf通过比较低29位来获取线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


ThreadPoolExecutor任务提交过程

 ThreadPoolExecutor提交任务代码是在AbstractExecutorService当中通过submit()方法实现的,按照两个步骤来实现:

  • 通过newTaskFor()方法创建待提交任务,该方法内部的实现后面再分析。
  • 通过execute()方法提交task,execute的在ThreadPoolExecutor类中实现重写。
  • 进一步跟进ThreadPoolExecutor的execute方法
public abstract class AbstractExecutorService implements ExecutorService {

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}



 整个ThreadPoolExecutor的execute其实在源码自带的注释中已经写的很清楚了,怕自己翻译的不是特别所以这次直接把注释也贴在代码当中了,整个过程分为三个过程:

  • 1、当前的线程数是否小于corePoolSize,新建core线程并运行第一个任务。
  • 2、如果第一步不满足条件,那么就把任务提交到workQueue代表的队列当中。
  • 3、如果第二步不满足条件,那么就就新建不属于corePoolSize计数的线程(也就是新建core以外的线程)来进行处理。
  • 4、如果都失败那么就直接通过rejectHandler拒绝任务,步骤123当中任何检测到线程池关闭的情况直接执行任务拒绝。
public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
}



  ThreadPoolExecutor的addWorker方法有两个参数,Runnable firstTask代表待执行任务, boolean core代表是否启动核心线程,整个启动过程主要分为三个步骤:

  • 前置检查:检查线程池是否处于关闭状态,在正常运行的情况下增加工作线程计数。
  • 正常处理:创建Worker对象并在加锁的条件下将新建worker添加到workers集合当中,并通过调用t.start()方法启动线程。
  • 后置处理:判断启动线程是否失败,如果失败那么就尝试中止线程池。
public class ThreadPoolExecutor extends AbstractExecutorService {
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 判断是否超出线程限制,corePoolSize和core线程数,
                // maximumPoolSize代表超出core部分的线程数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
}


ThreadPoolExecutor的worker介绍

  ThreadPoolExecutor的worker实现Runnable接口,在worker的内部run()方法中通过执行runWorker()方法来启动task,启动方式会调用task.run()方法,所以从这个角度来看,task的执行线程其实ThreadPoolExecutor线程池中的worker。

  • Worker类内部包含:Thread thread工作线程用于执行task、Runnable firstTask标识待执行任务。
  • runWorker()方法内部负责执行来自提交的firstTask或者阻塞从任务队列通过getTask()方法取得待执行任务
  • runWorker()方法内部通过执行task.run()负责真正执行任务。
public class ThreadPoolExecutor extends AbstractExecutorService {

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    }



 runWorker内部主要做两件事情,分别是:

  • 获取任务:通过直接传进来firstTask或者通过getTask从任务队列中获取任务
  • 执行任务:task.run()执行真正的task任务
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }



 getTask()方法外层是一个for循环,然后内部从workQueue获取任务,区分设置超时或者阻塞等待。

  • 阻塞等待直至线程获取到可消费任务。
  • 超时等待使用的是keepAliveTime,用于超时后设置线程超时标记然后线程退出工作。
  • 线程退出循环是通过返回task=null,外层循环直接结束实现。
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 标记线程退出工作部分的逻辑,通过返回task=null,从而在外层调用方实现退出while循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}


ThreadPoolExecutor的task介绍

  ThreadPoolExecutor的newTaskFor()方法负责创建task,创建的FutureTask的实例本身实现了Runnable、Future的接口。

  • FutureTask内部可以创建入参为Runnable的对象的时候会创建一个代理器
  • RunnableAdapter,创建入参为Callable的对象就比较直接了。
  • FutureTask的运行函数run()负责执行Callable对象的call()方法并将返回值通过set()方法设置到outcome对象。
  • FutureTask的get()方法负责获取返回值,就是我们submit()后返回的future的get()调用。
public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
}
public class Executors {
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
}




public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}




public class FutureTask<V> implements RunnableFuture<V> {
    /**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome; // non-volatile, protected by state reads/writes
    private volatile Thread runner;
    private volatile WaitNode waiters;

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    // 核心的逻辑,负责调用对象的call方法并赋值返回值
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}


参考文章

ThreadPoolExecutor解析-主要源码研究
ThreadPoolExecutor(五)——线程池关闭相关操作
ThreadPoolExecutor(六)——线程池关闭之后

目录
相关文章
|
8天前
|
运维 Java
Java版HIS系统 云HIS系统 云HIS源码 结构简洁、代码规范易阅读
云HIS系统分为两个大的系统,一个是基层卫生健康云综合管理系统,另一个是基层卫生健康云业务系统。基层卫生健康云综合管理系统由运营商、开发商和监管机构使用,用来进行运营管理、运维管理和综合监管。基层卫生健康云业务系统由基层医院使用,用来支撑医院各类业务运转。
34 5
|
3天前
|
搜索推荐 前端开发 Java
java医院绩效考核管理系统项目源码
系统需要和his系统进行对接,按照设定周期,从his系统获取医院科室和医生、护士、其他人员工作量,对没有录入信息化系统的工作量,绩效考核系统设有手工录入功能(可以批量导入),对获取的数据系统按照设定的公式进行汇算,且设置审核机制,可以退回修正,系统功能强大,完全模拟医院实际绩效核算过程,且每步核算都可以进行调整和参数设置,能适应医院多种绩效核算方式。
6 0
|
3天前
|
设计模式 算法 Java
[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式
[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式
|
3天前
|
设计模式 JavaScript Java
[设计模式Java实现附plantuml源码~行为型] 对象状态及其转换——状态模式
[设计模式Java实现附plantuml源码~行为型] 对象状态及其转换——状态模式
|
3天前
|
设计模式 存储 JavaScript
[设计模式Java实现附plantuml源码~创建型] 多态工厂的实现——工厂方法模式
[设计模式Java实现附plantuml源码~创建型] 多态工厂的实现——工厂方法模式
|
3天前
|
设计模式 Java Go
[设计模式Java实现附plantuml源码~创建型] 集中式工厂的实现~简单工厂模式
[设计模式Java实现附plantuml源码~创建型] 集中式工厂的实现~简单工厂模式
|
4天前
|
Java 调度
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
32 1
|
8天前
|
JavaScript Java 测试技术
基于Java的电影评论系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的电影评论系统的设计与实现(源码+lw+部署文档+讲解等)
30 0
|
8天前
|
JavaScript Java 测试技术
基于Java的在线日语培训平台的设计与实现(源码+lw+部署文档+讲解等)
基于Java的在线日语培训平台的设计与实现(源码+lw+部署文档+讲解等)
26 0
|
8天前
|
JavaScript Java 测试技术
基于Java的同城蔬菜配送管理系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的同城蔬菜配送管理系统的设计与实现(源码+lw+部署文档+讲解等)
11 0