深入理解ThreadPoolExecutor

简介: 在上节介绍ThreadPoolExecutor时,大部分参数中都很简单, 只有 workQueue和 handler需要进行详细说明。

在上节介绍ThreadPoolExecutor时,大部分参数中都很简单, 只有 workQueue和 handler需要进行详细说明。

队列

参数 workQueue指被提交但未执行的任务队列, 它是一个 BlockingQueue接口的对象,仅用于存放 Runnable对象。 根据队列功能分类, 在ThreadPoolExecutor的构造函数中可使用以下几种 BlockingQueue。

  • 直接提交的队列:该功能由 SynchronousQueue对象提供。 SynchronousQueue是一个特殊的 BlockingQueue。 SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作, 反之, 每一个删除操作都要等待对应的插入操作。 如果使用synchronousQueue, 提交的任务不会被真实的保存, 而总是将新任务提交给线程执行,如果没有空闲的进程, 则尝试创建新的进程, 如果进程数量已经达到最大值, 则执行拒绝策略。因此,使用 SynchronousQueue队列,通常要设置很大的 maximumPoolSize否则很容易执行拒绝策略。
  • 有界的任务队列:有界的任务队列可以使用 ArrayBlockingQueue实现。 当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示public ArrayBlockingQueue(int capacity);当使用有界的任务队列时, 若有新的任务需要执行, 如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程;若大于corePoolSize,则会将新任务加入等待队列;若等待队列已满,无法加入,则在总线程数不大于 maximumPoolSize的前提下,创建新的进程执行任;若大于 maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙, 否则确保核心线程数维持在 corePoolSize。
  • 无界的任务队列:无界任务队列可以通过 LinkedBlockingQueue类实现。与有界队列相比, 除非系统资源耗尽, 否则无界的任务队列不存在任务入队失败的情况 。 当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源, 则任务直接进入队列等待。 若任务创建和处理的速度差异很大, 无界队列会保持快速増长, 直到耗尽系统内存。
  • 优先任务队列:优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue实现, 可以控制任务的执行先后顺序 。 它是一个特殊的无界队列 。 无论是有界队ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue都是按照先进先出算法处理任务的 。 而 PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行, 在确保系统性能的同时, 也能有很好的质量保证 (总是确保高优先级的任务先执行)。

现在再看下newFixedThreadPool()方法的实现。它返回了一个 corePoolSize和 maximumPoolSize大小一样的,并且使用了 LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务, 当任务提交非常频繁的时候, 该队列可能迅速膨胀,从而耗尽系统资源。

而newSingleThreadExecutor()返回的单线程线程池, 是 newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1 。

newCachedThreadPool()方法返回 corePoolSize为0, maximumPoolSize无穷大的线程池,这意味着在没有任务时, 该线程池内无线程, 而当任务被提交时, 该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入 SynchronousQueue队列,而 SynchronousQueue队列是一种直接提交的队列, 它总会追使线程池增加新的线程执行任务 。 当任务执行完毕后, 由于corePoolSize为 0, 因此空闲线程又会在指定时间内(60秒)被回收。

拒绝策略

ThreadPoolExecutor的最后一个参数handler指定了拒绝策略 。也就是当任务数量超过系统实际承载能力时, 该如何处理呢?这时就要用到拒绝策略了 。拒绝策略可以说是系统超负荷运行时的补救措施, 通常由于压力太大而引起的, 也就是线程池中的线程已经用完了, 无法继续为新任务服务, 同时, 等待队列中也已经排满了, 再也塞不下新任务了 。这时我们就需要有一套机制, 合理地处理这个问题。

JDK内置提供了四种拒绝策略:

  • AbortPolicy策略:该策略会直接抛出运行时RejectedExecutionException异常,阻止系统正常工作。
  • CauerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。
  • DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务, 并尝试再次提交当前任务。
  • DiscardPolicy策略:该策略将丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种方案

以上内置的策略均实现了 RejectedExecutionHandler接口, 若以上策略仍无法满足实际应用需要,完全可以自己扩展 RejectedExecutionHandler接口 ,RejectedExecutionHandler的定义如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

下面列举一个代码简单说明一下

/**
 * 自定义线程池和拒绝策略
 * Created by xmr on 2018/9/5.
 */
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {

        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" +
                    Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /*自定义线程池和拒绝策略*/
    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),//默认创建线程的实现方式,可以省略
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

扩展线程池

虽然 JDK已经帮我们实现了这个稳定的高性能线程池 。 但如果我们需要对这个线程池做一些扩展, JDK还是给了我们支持了,它提供了 beforeExecute()、afterExecute()和 terminated()三个接口对线程池进行控制 。

在默认的ThreadPoolExecutor实现中,提供了空的 beforeExecute0和 aferExecute0实现。在实际应用中, 可以对其进行扩展来实现对线程池运行状态的跟踪, 输出一些有用的调试信息, 以帮助系统故障诊断, 这对于多线程程序错误排査是很有帮助的 。 下面演示了对线程池的扩展

/**
 * 扩展线程池
 * Created by xmr on 2018/9/5.
 */
public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        public void run() {
            System.out.println("正在执行 : Thread ID:" +
                    Thread.currentThread().getId() + ",Task name= " + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行: " + ((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成: " + ((MyTask) r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

 

参考:

https://blog.csdn.net/linghu_java/article/details/17123057

相关文章
|
4月前
|
缓存 搜索推荐 Java
线程池之ThreadPoolExecutor
线程池之ThreadPoolExecutor
44 0
|
9月前
|
机器学习/深度学习 消息中间件 存储
ThreadPoolExecutor解读
ThreadPoolExecutor解读
|
存储 缓存 监控
线程池 ThreadPoolExecutor 详解
对于操作系统而言,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要重新从内存中读取信息,破坏了数据的局部性。因此在并发编程中,当线程创建过多时,会影响程序性能,甚至引起程序崩溃。 而线程池属于池化管理模式,具有以下优点: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的性能消耗。 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性:能够对线程进行统一分配、调优和监控。
178 0
|
算法 安全 Java
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
|
存储 缓存 监控
ThreadPoolExecutor:线程池不允许使用Executors创建
ThreadPoolExecutor:线程池不允许使用Executors创建
348 0
ThreadPoolExecutor:线程池不允许使用Executors创建
线程池之ThreadPoolExecutor使用
通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景
|
Java
ThreadPoolExecutor详解
详细介绍了ThreadPoolExecutor线程池的各种参数和各种使用场景以及线程池中线程的创建时刻和策略
4183 0
|
缓存 Java
理解Java线程池ThreadPoolExecutor
介绍一下Java线程池ThreadPoolExecutor
12913 0
|
Java
ThreadPoolExecutor
一 核心属性 1  int corePoolSize 指该线程池中核心线程数最大值 核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程。
4666 0