java 并发编程 :Executor框架实现java 多线程

简介:





Executor框架简介

    在Java 5之后并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。


 Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。


 Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。


 ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当所有已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

 Executors提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。 

 1. public static ExecutorService newFixedThreadPool(int nThreads)

     创建固定数目线程的线程池。


2. public static ExecutorService newCachedThreadPool()

    创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线   程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。


3.public static ExecutorService newSingleThreadExecutor()

    创建一个单线程化的Executor。


4.public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。


这四种方法都是用的Executors中的ThreadFactory建立的线程,下面就以上四个方法做个比较


newCachedThreadPool() -缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中
-缓存型池子通常用于执行一些生存期很短的异步型任务
 因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。
-能reuse的线程,必须是timeout IDLE内的池中线程,缺省     timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
  注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
newFixedThreadPool(int)   -newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待直到当前的线程中某个线程终止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
-从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)    
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE  
newScheduledThreadPool(int) -调度型线程池
-这个池子里的线程可以按schedule依次delay执行,或周期执行
SingleThreadExecutor() -单例线程,任意时间池中只能有一个线程
-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)


一般来说,CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。(该段话摘自《Thinking in Java》第四版)



Executor执行Runnable任务

 通过Executors的以上四个静态工厂方法获得 ExecutorService实例,而后调用该实例的execute(Runnable command)方法即可。一旦Runnable任务传递到execute()方法,该方法便会自动在一个线程上执行。下面是是Executor执行Runnable任务的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  java.util.concurrent.ExecutorService;   
import  java.util.concurrent.Executors;   
   
public  class  TestCachedThreadPool{   
     public  static  void  main(String[] args){   
         ExecutorService executorService = Executors.newCachedThreadPool();   
//      ExecutorService executorService = Executors.newFixedThreadPool(5);  
//      ExecutorService executorService = Executors.newSingleThreadExecutor();  
         for  ( int  i =  0 ; i <  5 ; i++){   
             executorService.execute( new  TestRunnable());   
             System.out.println( "************* a"  + i +  " *************" );   
         }   
         executorService.shutdown();   
     }   
}   
   
class  TestRunnable  implements  Runnable{   
     public  void  run(){   
         System.out.println(Thread.currentThread().getName() +  "线程被调用了。" );   
     }   
}


 某次执行后的结果如下:

从结果中可以看出,pool-1-thread-1和pool-1-thread-2均被调用了两次,这是随机的,execute会首先在线程池中选择一个已有空闲线程来执行任务,如果线程池中没有空闲线程,它便会创建一个新的线程来执行任务。



Executor执行Callable任务

 在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable<T> task) 方法来执行,并且返回一个 <T>Future<T>,是表示任务等待完成的 Future。

 Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而且当获取返回结果时可能会抛出异常。Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有

 当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。

 下面给出一个Executor执行Callable任务的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import  java.util.ArrayList;   
import  java.util.List;   
import  java.util.concurrent.*;   
   
public  class  CallableDemo{   
     public  static  void  main(String[] args){   
         ExecutorService executorService = Executors.newCachedThreadPool();   
         List<Future<String>> resultList =  new  ArrayList<Future<String>>();   
   
         //创建10个任务并执行   
         for  ( int  i =  0 ; i <  10 ; i++){   
             //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中   
             Future<String> future = executorService.submit( new  TaskWithResult(i));   
             //将任务执行结果存储到List中   
             resultList.add(future);   
         }   
   
         //遍历任务的结果   
         for  (Future<String> fs : resultList){   
                 try {   
                     while (!fs.isDone); //Future返回如果没有完成,则一直循环等待,直到Future返回完成  
                     System.out.println(fs.get());      //打印各个线程(任务)执行的结果   
                 } catch (InterruptedException e){   
                     e.printStackTrace();   
                 } catch (ExecutionException e){   
                     e.printStackTrace();   
                 } finally {   
                     //启动一次顺序关闭,执行以前提交的任务,但不接受新任务  
                     executorService.shutdown();   
                 }   
         }   
     }   
}   
   
   
class  TaskWithResult  implements  Callable<String>{   
     private  int  id;   
   
     public  TaskWithResult( int  id){   
         this .id = id;   
     }   
   
     /**  
      * 任务的具体过程,一旦任务传给ExecutorService的submit方法, 
      * 则该方法自动在一个线程上执行 
      */   
     public  String call()  throws  Exception {  
         System.out.println( "call()方法被自动调用!!!    "  + Thread.currentThread().getName());   
         //该返回结果将被Future的get方法得到  
         return  "call()方法被自动调用,任务返回的结果是:"  + id +  "    "  + Thread.currentThread().getName();   
     }   
}


某次执行结果如下:


20131221170327718



 从结果中可以同样可以看出,submit也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。另外,需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过调用isDone()方法判断Future是否完成了返回。


自定义线程池


自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池,这里先贴上示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import  java.util.concurrent.ArrayBlockingQueue;   
import  java.util.concurrent.BlockingQueue;   
import  java.util.concurrent.ThreadPoolExecutor;   
import  java.util.concurrent.TimeUnit;   
   
public  class  ThreadPoolTest{   
     public  static  void  main(String[] args){   
         //创建等待队列   
         BlockingQueue<Runnable> bqueue =  new  ArrayBlockingQueue<Runnable>( 20 );   
         //创建线程池,池中保存的线程数为3,允许的最大线程数为5  
         ThreadPoolExecutor pool =  new  ThreadPoolExecutor( 3 , 5 , 50 ,TimeUnit.MILLISECONDS,bqueue);   
         //创建七个任务   
         Runnable t1 =  new  MyThread();   
         Runnable t2 =  new  MyThread();   
         Runnable t3 =  new  MyThread();   
         Runnable t4 =  new  MyThread();   
         Runnable t5 =  new  MyThread();   
         Runnable t6 =  new  MyThread();   
         Runnable t7 =  new  MyThread();   
         //每个任务会在一个线程上执行  
         pool.execute(t1);   
         pool.execute(t2);   
         pool.execute(t3);   
         pool.execute(t4);   
         pool.execute(t5);   
         pool.execute(t6);   
         pool.execute(t7);   
         //关闭线程池   
         pool.shutdown();   
     }   
}   
   
class  MyThread  implements  Runnable{   
     @Override   
     public  void  run(){   
         System.out.println(Thread.currentThread().getName() +  "正在执行。。。" );   
         try {   
             Thread.sleep( 100 );   
         } catch (InterruptedException e){   
             e.printStackTrace();   
         }   
     }   
}

运行结果如下:

20131222102933609

  

 从结果中可以看出,七个任务是在线程池的三个线程上执行的。这里简要说明下用到的ThreadPoolExecuror类的构造方法中各个参数的含义。   



public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:线程池中所保存的核心线程数,包括空闲线程。

maximumPoolSize:池中允许的最大线程数。

keepAliveTime:线程池中的空闲线程所能持续的最长时间。

unit:持续时间的单位。

workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。

    根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法讲一个Runnable任务添加到线程池中时,按照如下顺序来处理:

    1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

    2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

    3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

    4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

    总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

    另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

    我们大致来看下Executors的源码,newCachedThreadPool的不带RejectedExecutionHandler参数(即第五个参数,线程数量超过maximumPoolSize时,指定处理方式)的构造方法如下:

1
2
3
4
5
public  static  ExecutorService newCachedThreadPool() {  
     return  new  ThreadPoolExecutor( 0 , Integer.MAX_VALUE,  
                                   60L, TimeUnit.SECONDS,  
                                   new  SynchronousQueue<Runnable>());  
}

 它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue<Runnalbe>决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。


    再来看newFixedThreadPool的不带RejectedExecutionHandler参数的构造方法,如下:

1
2
3
4
5
public  static  ExecutorService newFixedThreadPool( int  nThreads) {  
     return  new  ThreadPoolExecutor(nThreads, nThreads,  
                                   0L, TimeUnit.MILLISECONDS,  
                                   new  LinkedBlockingQueue<Runnable>());  
}

 它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,敢于LinkedBlockingQueue下面会说。


    下面说说几种排队的策略:

    1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。

    2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

    3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。




      本文转自布拉君君 51CTO博客,原文链接:http://blog.51cto.com/5148737/1975939,如需转载请自行联系原作者






相关文章
|
1天前
|
安全 Java
java多线程(一)(火车售票)
java多线程(一)(火车售票)
|
1天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
1天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。
|
11天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
1月前
|
存储 Java 程序员
Java并发编程:深入理解线程池
【2月更文挑战第14天】 在现代软件开发中,高效地处理并发任务已成为提升性能和响应速度的关键。Java作为广泛使用的编程语言,其内置的并发工具特别是线程池机制,为开发者提供了强大的支持。本文将深入探讨Java线程池的核心概念、工作机制以及如何合理配置线程池以适应不同的应用场景。我们将通过理论解析与实践案例相结合的方式,使读者不仅理解线程池的工作原理,还能掌握其在复杂系统中的高效应用。
24 0
|
30天前
|
监控 Java
Java并发编程中的线程池优化技巧
在Java并发编程中,线程池扮演着至关重要的角色。本文将深入探讨如何优化Java线程池,从线程池的创建与配置、任务队列的选择、拒绝策略的制定、线程池状态的监控等多个方面进行详细阐述。通过本文的阅读,您将了解到如何合理地利用线程池,提高系统的并发性能,从而更好地应对各种并发场景。
|
12天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
2天前
|
缓存 分布式计算 监控
Java并发编程:深入理解线程池
【4月更文挑战第17天】在Java并发编程中,线程池是一种非常重要的技术,它可以有效地管理和控制线程的执行,提高系统的性能和稳定性。本文将深入探讨Java线程池的工作原理,使用方法以及在实际开发中的应用场景,帮助读者更好地理解和使用Java线程池。
|
3天前
|
缓存 监控 Java
Java并发编程:线程池与任务调度
【4月更文挑战第16天】Java并发编程中,线程池和任务调度是核心概念,能提升系统性能和响应速度。线程池通过重用线程减少创建销毁开销,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。任务调度允许立即或延迟执行任务,具有灵活性。最佳实践包括合理配置线程池大小、避免过度使用线程、及时关闭线程池和处理异常。掌握这些能有效管理并发任务,避免性能瓶颈。
|
9天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第10天】本文将深入探讨Java并发编程中的一个重要主题——线程池。我们将从线程池的基本概念入手,逐步深入到线程池的实现原理,以及如何在实际开发中合理使用线程池。通过本文的学习,你将能够理解线程池的核心原理,掌握线程池的使用技巧,以及避免常见的线程池使用误区。