java源码-ThreadPoolExecutor(3)

简介: 开篇 部门以前有一个很跳的程序员写了一个神奇的代码,通过不停创建ThreadPoolExecutor而不调用shutdown导致线程过多溢出,然后想想毕竟我还是一个不跳的程序员,所以还是研究研究比较合适。

开篇

 部门以前有一个很跳的程序员写了一个神奇的代码,通过不停创建ThreadPoolExecutor而不调用shutdown导致线程过多溢出,然后想想毕竟我还是一个不跳的程序员,所以还是研究研究比较合适。这篇文章就是说明线程池的退出过程
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)


shutdown源码解析

  • 1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
  • 2、判断调用者是否有权限shutdown线程池
  • 3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务
  • 4、中断所有空闲线程 interruptIdleWorkers()
  • 5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
  • 6、解锁
  • 7、尝试终止线程池 tryTerminate()
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 权限检查
            checkShutdownAccess();
            
            // 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回
            advanceRunState(SHUTDOWN);
            
            // 设置中断标志,中断所有等待任务的空闲线程
            interruptIdleWorkers();

            // ThreadPoolExecutor没什么操作
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试状态变为TERMINATED
        tryTerminate();
    }


    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            // 设置执行状态为SHUTDOWN
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    
    // onlyOne在这里传入为false
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍历所有worker,通过判断能否获取锁来判定worker是否空闲
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 针对能够获取到锁的线程,线程正阻塞在获取任务的过程中
                        // 通过中断线程然后迫使线程退出阻塞然后工作线程退出工作
                        // 然后工作线程就自然的被回收了
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }


shutdownNow源码解析

 shutdownNow() 和 shutdown()的大体流程相似,差别是:

  • 1、将线程池更新为stop状态
  • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
  • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 权限检查
            checkShutdownAccess();
            
            // 设置线程池状态为stop
            advanceRunState(STOP);

            // 中断所有线程
            interruptWorkers();

            // 移除所有待消费的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }

        // 尝试状态变为TERMINATED
        tryTerminate();

        return tasks;
    }


awaitTermination源码解析

 等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。

  • 等待过程中如果发现未超时那么通过for循环继续等待超时
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }


工作线程Worker退出逻辑

 如果获取task为null就会退出while循环从而执行finally部分的逻辑,也就是processWorkerExit()方法执行清了工作.

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果获取任务为null,那么就会退出当前工作线程
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 执行清了工作
            processWorkerExit(w, completedAbruptly);
        }
    }

 getTask()方法(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))当中我们设置状态为SHUTDOWN,同时workQueue为空的情况下就会返回null,在外层循环中就会退出线程的工作,实现线程退出。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


参考文章

Java中线程池ThreadPoolExecutor原理探究
Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

目录
相关文章
|
1天前
|
存储 数据可视化 安全
Java全套智慧校园系统源码springboot+elmentui +Quartz可视化校园管理平台系统源码 建设智慧校园的5大关键技术
智慧校园指的是以物联网为基础的智慧化的校园工作、学习和生活一体化环境,这个一体化环境以各种应用服务系统为载体,将教学、科研、管理和校园生活进行充分融合。无处不在的网络学习、融合创新的网络科研、透明高效的校务治理、丰富多彩的校园文化、方便周到的校园生活。简而言之,“要做一个安全、稳定、环保、节能的校园。
19 6
|
3天前
|
JavaScript Java 测试技术
基于Java的物流配送人员车辆调度管理系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的物流配送人员车辆调度管理系统的设计与实现(源码+lw+部署文档+讲解等)
8 0
|
3天前
|
JavaScript Java 测试技术
基于Java的网上书城的设计与实现(源码+lw+部署文档+讲解等)
基于Java的网上书城的设计与实现(源码+lw+部署文档+讲解等)
18 0
|
3天前
|
JavaScript Java 测试技术
基于Java的网络游戏交易系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的网络游戏交易系统的设计与实现(源码+lw+部署文档+讲解等)
15 0
|
3天前
|
JavaScript Java 测试技术
基于Java的家政公司服务平台的设计与实现(源码+lw+部署文档+讲解等)
基于Java的家政公司服务平台的设计与实现(源码+lw+部署文档+讲解等)
19 1
|
3天前
|
JavaScript Java 测试技术
基于Java的怀旧唱片售卖系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的怀旧唱片售卖系统的设计与实现(源码+lw+部署文档+讲解等)
25 5
|
3天前
|
人工智能 小程序 Java
java中小学校智慧校园电子班牌管理系统源码
使用springboot框架Java+vue2 B/S架构
22 3
|
6天前
|
JavaScript Java 测试技术
基于Java的阅微文学网站的设计与实现(源码+lw+部署文档+讲解等)
基于Java的阅微文学网站的设计与实现(源码+lw+部署文档+讲解等)
16 2
|
7天前
|
JavaScript Java 测试技术
基于Java的智能实时疫情监管服务平台的设计与实现(源码+lw+部署文档+讲解等)
基于Java的智能实时疫情监管服务平台的设计与实现(源码+lw+部署文档+讲解等)
18 0
|
7天前
|
JavaScript Java 测试技术
基于Java的教师上课系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的教师上课系统的设计与实现(源码+lw+部署文档+讲解等)
21 0