开篇
部门以前有一个很跳的程序员写了一个神奇的代码,通过不停创建ThreadPoolExecutor而不调用shutdown导致线程过多溢出,然后想想毕竟我还是一个不跳的程序员,所以还是研究研究比较合适。这篇文章就是说明线程池的退出过程。
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)
shutdown源码解析
- 1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
- 2、判断调用者是否有权限shutdown线程池
- 3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务
- 4、中断所有空闲线程 interruptIdleWorkers()
- 5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
- 6、解锁
- 7、尝试终止线程池 tryTerminate()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
// 设置中断标志,中断所有等待任务的空闲线程
interruptIdleWorkers();
// ThreadPoolExecutor没什么操作
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试状态变为TERMINATED
tryTerminate();
}
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 设置执行状态为SHUTDOWN
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// onlyOne在这里传入为false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有worker,通过判断能否获取锁来判定worker是否空闲
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
// 针对能够获取到锁的线程,线程正阻塞在获取任务的过程中
// 通过中断线程然后迫使线程退出阻塞然后工作线程退出工作
// 然后工作线程就自然的被回收了
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow源码解析
shutdownNow() 和 shutdown()的大体流程相似,差别是:
- 1、将线程池更新为stop状态
- 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
- 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 设置线程池状态为stop
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 移除所有待消费的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试状态变为TERMINATED
tryTerminate();
return tasks;
}
awaitTermination源码解析
等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。
- 等待过程中如果发现未超时那么通过for循环继续等待超时
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
工作线程Worker退出逻辑
如果获取task为null就会退出while循环从而执行finally部分的逻辑,也就是processWorkerExit()方法执行清了工作.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果获取任务为null,那么就会退出当前工作线程
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 执行清了工作
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))当中我们设置状态为SHUTDOWN,同时workQueue为空的情况下就会返回null,在外层循环中就会退出线程的工作,实现线程退出。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
参考文章
Java中线程池ThreadPoolExecutor原理探究
Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理