2018-06-13 第三十八天

  1. 云栖社区>
  2. 博客>
  3. 正文

2018-06-13 第三十八天

winniehu 2018-06-13 22:43:01 浏览656
展开阅读全文

一、wait--notify--sleep

Object obj = new Object();

obj.wait(): 让当前线程在 obj上  等待。在调用obj.wait 之前,当前线程必须 对 obj上锁。调用obj.wait()之后,当前线程立即 释放 对obj 的锁,并等待进入阻塞状态。

当其他的线程 调用 obj.notify() 或者调用 obj.notifyAll() 的时候。进入阻塞状态的 线程 [被唤醒]。从阻塞状态进入 就绪状态。等待cpu 的调度执行。该线程再次执行的时候,需要先对obj 继续上锁,然后从 obj.wait() 后继续执行。

 

obj.notify(): 唤醒 obj 上等待(阻塞,调用了obj.wait()方法)的 某一个线程。让某一个线程从 等待队列中,进入请求资源队列中,进入就绪状态。

 

obj.notifyAll(): 唤醒所有在 obj 上等待的线程。从阻塞状态进入就绪状态。

 

加时间参数的wait(time) 阻塞的线程最多等待的时间为 time。 解除阻塞的条件多了一个。一个是 时间到了,一个是被notify 唤醒了。

 

sleep 和  wait 区别:

1:sleep 没有要求必须在同步代码块|同步方法中使用。 wait()必须在同步块|同步方法中使用。

2:sleep 如果在同步代码块中使用,那么休眠的过程中,不会释放对监视器的锁。wait() 会释放对监视器的锁。

3:解除阻塞的条件不同:sleep 休眠时间到了,自动从阻塞状态进入就绪状态。   wait() 必须被其他的线程唤醒,从阻塞到就绪状态。

 

二、jdk1.5 线程

Callable<泛型>

FutureTask<泛型>

Future<>

 

import java.util.Random;

import java.util.concurrent.Callable;

import java.util.concurrent.FutureTask;

 

/**

 * jdk1.5  线程

 *

 */

public class CallableTest {

 

public static void main(String[] args) throws Exception{

RandomCallable callable = new RandomCallable();

//FutureTask Runnable  Future 唯一的实现的子类。

FutureTask<Integer> task = new FutureTask<>(callable);

//Future 接口,主要用来负责 线程任务的调度和判断的执行。

//先创建一个线程对象

Thread thread = new Thread(task);

thread.start();

System.out.println(task.isDone());

//get 是一个阻塞式的方法,直到等到线程call 方法返回,得到结果。get 才返回。

System.out.println(task.get());

System.out.println(task.isDone());

}

}

 

//得到一个随机的正整数[0~10]

//泛型中的类型为 线程执行得到的结果的类型

class RandomCallable implements Callable<Integer>{

 

//和之前的run 方法是类型的。是线程的任务的主体代码。

// run 方法的区别

//1: call 方法可以指定 返回的类型。线程执行完毕之后会有一个结果。

//2: call 方法 自定义了抛出异常,在方法体中,如果有异常抛出,不需要再必须显式的 try-catch

public Integer call() throws Exception {

Thread.sleep(5000);

return new Random().nextInt(11);//[0~11)

}

}

 

三、中断线程

public void interrupt():设置线程的中断标识位为true。并不会真的中断线程。如果遇到阻塞的状态,那么会抛出一个InterruptedException,并清除中断标识位。

public static boolean interrupted():判断线程的中断标识位的状态,并清除该状态为false。

public boolean isInterrupted()判断线程的中断标识位的状态,不会影响状态。

 

中断应用

1:使用中断信号量中断非阻塞状态的线程

 

中断线程最好的,最受推荐的方式是,使用共享变量(shared variable)发出信号,告诉线程必须停止正在运行的任务。线程必须周期性的核查这一变量,然后有秩序地中止任务。Example2描述了这一方式:

 

class Example2 extends Thread {

    volatile boolean stop = false;// 线程中断信号量

 

    public static void main(String args[]) throws Exception {

        Example2 thread = new Example2();

        System.out.println("Starting thread...");

        thread.start();

        Thread.sleep(3000);

        System.out.println("Asking thread to stop...");

        // 设置中断信号量

        thread.stop = true;

        Thread.sleep(3000);

        System.out.println("Stopping application...");

    }

 

    public void run() {

        // 每隔一秒检测一下中断信号量

        while (!stop) {

            System.out.println("Thread is running...");

            long time = System.currentTimeMillis();

            /*

             * 使用while循环模拟 sleep 方法,这里不要使用sleep,否则在阻塞时会 抛

             * InterruptedException异常而退出循环,这样while检测stop条件就不会执行,

             * 失去了意义。

             */

            while ((System.currentTimeMillis() - time < 1000)) {}

        }

        System.out.println("Thread exiting under request...");

    }

}

 

2:使用thread.interrupt()中断非阻塞状态线程

 

虽然Example2该方法要求一些编码,但并不难实现。同时,它给予线程机会进行必要的清理工作。这里需注意一点的是需将共享变量定义成volatile 类型或将对它的一切访问封入同步的块/方法(synchronized blocks/methods)中。上面是中断一个非阻塞状态的线程的常见做法,但对非检测isInterrupted()条件会更简洁:

 

class Example2 extends Thread {

    public static void main(String args[]) throws Exception {

        Example2 thread = new Example2();

        System.out.println("Starting thread...");

        thread.start();

        Thread.sleep(3000);

        System.out.println("Asking thread to stop...");

        // 发出中断请求

        thread.interrupt();

        Thread.sleep(3000);

        System.out.println("Stopping application...");

    }

 

    public void run() {

        // 每隔一秒检测是否设置了中断标示

        while (!Thread.currentThread().isInterrupted()) {

            System.out.println("Thread is running...");

            long time = System.currentTimeMillis();

            // 使用while循环模拟 sleep

            while ((System.currentTimeMillis() - time < 1000) ) {

            }

        }

        System.out.println("Thread exiting under request...");

    }

}

 

3:使用thread.interrupt()中断阻塞状态线程


Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,设置线程的中断标示位,在线程受到阻塞的地方(如调用sleep、wait、join等地方)抛出一个异常InterruptedException,并且中断状态也将被清除,这样线程就得以退出阻塞的状态。下面是具体实现:

 

class Example3 extends Thread {

    public static void main(String args[]) throws Exception {

        Example3 thread = new Example3();

        System.out.println("Starting thread...");

        thread.start();

        Thread.sleep(3000);

        System.out.println("Asking thread to stop...");

        thread.interrupt();// 等中断信号量设置后再调用

        Thread.sleep(3000);

        System.out.println("Stopping application...");

    }

 

    public void run() {

        while (!Thread.currentThread().isInterrupted()) {

            System.out.println("Thread running...");

            try {

                /*

                 * 如果线程阻塞,将不会去检查中断信号量stop变量,所 以thread.interrupt()

                 * 会使阻塞线程从阻塞的地方抛出异常,让阻塞线程从阻塞状态逃离出来,并

                 * 进行异常块进行 相应的处理

                 */

                Thread.sleep(1000);// 线程阻塞,如果线程收到中断操作信号将抛出异常

            } catch (InterruptedException e) {

                System.out.println("Thread interrupted...");

                /*

                 * 如果线程在调用 Object.wait()方法,或者该类的 join() 、sleep()方法

                 * 过程中受阻,则其中断状态将被清除

                 */

                System.out.println(this.isInterrupted());// false

 

                //中不中断由自己决定,如果需要真真中断线程,则需要重新设置中断位,如果

                //不需要,则不用调用

                Thread.currentThread().interrupt();

            }

        }

        System.out.println("Thread exiting under request...");

    }

}

 

4:死锁的线程无法被中断

 

5:中断I/O操作

 

然而,如果线程在I/O操作进行时被阻塞,又会如何?I/O操作可以阻塞线程一段相当长的时间,特别是牵扯到网络应用时。例如,服务器可能需要等待一个请求(request),又或者,一个网络应用程序可能要等待远端主机的响应。

 

实现此InterruptibleChannel接口的通道是可中断的:如果某个线程在可中断通道上因调用某个阻塞的 I/O 操作(常见的操作一般有这些:serverSocketChannel. accept()、socketChannel.connect、socketChannel.open、socketChannel.read、socketChannel.write、fileChannel.read、fileChannel.write)而进入阻塞状态,而另一个线程又调用了该阻塞线程的 interrupt 方法,这将导致该通道被关闭,并且已阻塞线程接将会收到ClosedByInterruptException,并且设置已阻塞线程的中断状态。另外,如果已设置某个线程的中断状态并且它在通道上调用某个阻塞的 I/O 操作,则该通道将关闭并且该线程立即接收到 ClosedByInterruptException;并仍然设置其中断状态。如果情况是这样,其代码的逻辑和第三个例子中的是一样的,只是异常不同而已。

 

如果你正使用通道(channels)(这是在Java 1.4中引入的新的I/O API),那么被阻塞的线程将收到一个ClosedByInterruptException异常。但是,你可能正使用Java1.0之前就存在的传统的I/O,而且要求更多的工作。既然这样,Thread.interrupt()将不起作用,因为线程将不会退出被阻塞状态。Example5描述了这一行为。尽管interrupt()被调用,线程也不会退出被阻塞状态,比如ServerSocket的accept方法根本不抛出异常。

 

很幸运,Java平台为这种情形提供了一项解决方案,即调用阻塞该线程的套接字的close()方法。在这种情形下,如果线程被I/O操作阻塞,当调用该套接字的close方法时,该线程在调用accept地方法将接收到一个SocketException(SocketException为IOException的子异常)异常,这与使用interrupt()方法引起一个InterruptedException异常被抛出非常相似,(注,如果是流因读写阻塞后,调用流的close方法也会被阻塞,根本不能调用,更不会抛IOExcepiton,此种情况下怎样中断?我想可以转换为通道来操作流可以解决,比如文件通道)。下面是具体实现:

class Example6 extends Thread {

    volatile ServerSocket socket;

 

    public static void main(String args[]) throws Exception {

        Example6 thread = new Example6();

        System.out.println("Starting thread...");

        thread.start();

        Thread.sleep(3000);

        System.out.println("Asking thread to stop...");

        Thread.currentThread().interrupt();// 再调用interrupt方法

        thread.socket.close();// 再调用close方法

        try {

            Thread.sleep(3000);

        } catch (InterruptedException e) {

        }

        System.out.println("Stopping application...");

    }

 

    public void run() {

        try {

            socket = new ServerSocket(8888);

        } catch (IOException e) {

            System.out.println("Could not create the socket...");

            return;

        }

        while (!Thread.currentThread().isInterrupted()) {

            System.out.println("Waiting for connection...");

            try {

                socket.accept();

            } catch (IOException e) {

                System.out.println("accept() failed or interrupted...");

                Thread.currentThread().interrupt();//重新设置中断标示位

            }

        }

        System.out.println("Thread exiting under request...");

    }

}

 

一、没有任何语言方面的需求一个被中断的线程应该终止。中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断。

 

二、对于处于sleep,join等操作的线程,如果被调用interrupt()后,会抛出InterruptedException,然后线程的中断标志位会由true重置为false,因为线程为了处理异常已经重新处于就绪状态。

 

三、不可中断的操作,包括进入synchronized段以及Lock.lock(),inputSteam.read()等,调用interrupt()对于这几个问题无效,因为它们都不抛出中断异常。如果拿不到资源,它们会无限期阻塞下去。

 

对于Lock.lock(),可以改用Lock.lockInterruptibly(),可被中断的加锁操作,它可以抛出中断异常。等同于等待时间无限长的Lock.tryLock(long time, TimeUnit unit)。

 

对于inputStream等资源,有些(实现了interruptibleChannel接口)可以通过close()方法将资源关闭,对应的阻塞也会被放开。

 

public static boolean interrupted

测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false。

 

public boolean isInterrupted():

测试线程是否已经中断。线程的中断状态 不受该方法的影响。

 

public void interrupt():

中断线程。

 

上面列出了与中断有关的几个方法及其行为,可以看到interrupt是中断线程。如果不了解Java的中断机制,这样的一种解释极容易造成误解,认为调用了线程的interrupt方法就一定会中断线程。

 

其实,Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。每个线程都有一个boolean的中断状态(这个状态不在Thread的属性上),interrupt方法仅仅只是将该状态置为true。

 

比如对正常运行的线程调用interrupt()并不能终止他,只是改变了interrupt标示符。

 

一般说来,如果一个方法声明抛出InterruptedException,表示该方法是可中断的,比如wait,sleep,join,也就是说可中断方法会对interrupt调用做出响应(例如sleep响应interrupt的操作包括清除中断状态,抛出InterruptedException),异常都是由可中断方法自己抛出来的,并不是直接由interrupt方法直接引起的。

 

Object.wait, Thread.sleep方法,会不断的轮询监听 interrupted 标志位,发现其设置为true后,会停止阻塞并抛出 InterruptedException异常。

 

/**

 * 线程中断

 *

 */

public class InterruptThreadTest {

 

public static void main(String[] args) throws InterruptedException {

// InterruptThread thread = new InterruptThread();

// thread.start();

//

// Thread.sleep(1000);

// //3秒之后中断线程

// thread.setFlag(false);

InterruptThread1 thread = new InterruptThread1();

thread.start();

Thread.sleep(1000);

//中断线程   设置线程中断标识位为 true

thread.interrupt();

}

}

 

//最常见的线程中断的方式

class InterruptThread extends Thread{

//通过修改 flag 标识位 ,来控制循环的时间

//如果当前线程被阻塞了,是不能立即中断的,必须等阻塞解除了才能中断

private boolean flag = true;

@Override

public void run() {

int i = 0;

while(flag){

// try {

// Thread.sleep(1000);

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

//控制一秒的循环的周期,避免sleep 可能带来的异常发生

long time = System.currentTimeMillis();

while(true){

if(System.currentTimeMillis()-time >=1000)

break;

}

System.out.println("你好 = "+i++);

}

}

public void setFlag(boolean flag){

this.flag =flag;

}

}

 

//public void interrupt()

//设置线程的中断标识位为true。并不会真的中断线程。如果遇到阻塞的状态,那么会抛出一个InterruptedException,并清除中断标识位。

//public static boolean interrupted()

//判断线程的中断标识位的状态,并清除该状态为false

//public boolean isInterrupted()

//判断线程的中断标识位的状态,不会影响状态。

//通过修改线程的中断标志位 来控制线程的中断

class InterruptThread1 extends Thread{

@Override

public void run() {

int i = 0;

//当线程的中断标识位为 false 的时候,执行线程

while(false == isInterrupted()){

System.out.println("你好 = "+i++);

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

//当线程在休眠的过程中,调用了interrupt() 方法,那么线程会抛出一个异常

//InterruptedException 被捕获,中断标志位 被还原为 false

//如果想在此刻中断线程,那么需要重新调用 interrupt() 方法,将中断标志位 设置为 true

//才可以中断线程。

interrupt();

}

}

}

}

 

四、线程同步

1.同步块-同步方法

进行多线程编程,同步控制是非常重要的,而同步控制就涉及到了锁。

       对代码进行同步控制我们可以选择同步方法,也可以选择同步块,这两种方式各有优缺点,至于具体选择什么方式,就见仁见智了,同步块不仅可以更加精确的控制对象锁,也就是控制锁的作用域,何谓锁的作用域?锁的作用域就是从锁被获取到其被释放的时间。而且可以选择要获取哪个对象的对象锁。但是如果在使用同步块机制时,如果使用过多的锁也会容易引起死锁问题,同时获取和释放所也有代价,而同步方法,它们所拥有的锁就是该方法所属的类的对象锁,换句话说,也就是this对象,而且锁的作用域也是整个方法,这可能导致其锁的作用域可能太大,也有可能引起死锁,同时因为可能包含了不需要进行同步的代码块在内,也会降低程序的运行效率。而不管是同步方法还是同步块,我们都不应该在他们的代码块内包含无限循环,如果代码内部要是有了无限循环,那么这个同步方法或者同步块在获取锁以后因为代码会一直不停的循环着运行下去,也就没有机会释放它所获取的锁,而其它等待这把锁的线程就永远无法获取这把锁,这就造成了一种死锁现象。

 

2. 静态同步方法-非静态同步方法

所有的非静态同步方法用的都是同一把锁——实例对象本身,也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。

        而所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!

 

3. 同步锁的选择

而对于同步块,由于其锁是可以选择的,所以只有使用同一把锁的同步块之间才有着竞态条件,这就得具体情况具体分析了,但这里有个需要注意的地方,同步块的锁是可以选择的,但是不是可以任意选择的!!!!这里必须要注意一个物理对象和一个引用对象的实例变量之间的区别!使用一个引用对象的实例变量作为锁并不是一个好的选择,因为同步块在执行过程中可能会改变它的值,其中就包括将其设置为null,而对一个null对象加锁会产生异常,并且对不同的对象加锁也违背了同步的初衷!这看起来是很清楚的,但是一个经常发生的错误就是选用了错误的锁对象,因此必须注意:同步是基于实际对象而不是对象引用的!多个变量可以引用同一个对象,变量也可以改变其值从而指向其他的对象,因此,当选择一个对象锁时,我们要根据实际对象而不是其引用来考虑!作为一个原则,不要选择一个可能会在锁的作用域中改变值的实例变量作为锁对象!!!!

 

五、Object类

Object类的方法

 

1:toString(): 得到对象的字符串表示形式。

默认实现: 类的包名+类名+@+hashCode()

很多时候,在需要将一个对象的字符串表示形式输出的时候,底层都调用了该方法。

 

2:equals(Object o): 用于比较两个对象是否相等的。

默认实现:比较当前对象和 o  是否是同一个对象。

 

3:finalize(): 当对象所占的内存被 GC 回收的时候,该对象的类型中的 finalize() 方法会被jvm 调用。

默认实现:空

 

4:hashCode() : 得到 对象的哈希码 ,如果对象存在于HashSet 中,或者HashMap 的key ,那么该对象的类型中需要重写 hashCode 和 equals 方法。

默认实现:native 方法。 对象的内存地址转换的一个整数值。不同的对象的默认的hashCode 的返回值是不同的。

 

5:wait(): 让当前线程在 当前对象上等待,并释放对当前对象的锁。

 

6:notify(): 唤醒在当前对象上等待的某一个线程进入就绪状态。

 

7:notifyAll() : 唤醒在当前对象上等待的所有的线程进入就绪状态。

 

8:clone(): 复制对象的。只有实现了Cloneable 接口的类的对象 可以通过clone() 方法实现对象的复制。

 

9: getClass(): 得到当前对象的类型的大Class 对象。  类名.class 是类似的功能。

 

创建对象的方式:

1:通过new 关键字 调用构造方法去创建对象。

2:clone() 克隆

3:类的静态工厂方法。

4:反序列化。

5:反射

 

六、线程池

1:Java中的ThreadPoolExecutor类

 

  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

 

   ThreadPoolExecutor类中提供了四个构造方法:

 

public class ThreadPoolExecutor extends AbstractExecutorService {

    .....

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

            BlockingQueue<Runnable> workQueue);

 

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

 

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

 

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

    ...

}

 

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

 

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

 

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

 

unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天

TimeUnit.HOURS;             //小时

TimeUnit.MINUTES;           //分钟

TimeUnit.SECONDS;           //秒

TimeUnit.MILLISECONDS;      //毫秒

TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS;       //纳秒

 

 

workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;

LinkedBlockingQueue;

SynchronousQueue;

 

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

 

threadFactory:线程工厂,主要用来创建线程;

 

handler:表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

 

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

ThreadPoolExecutor类中有几个非常重要的方法:

execute()execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit():submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。

shutdownNow():尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。

并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。

 

--------------------------------------------------------------------------------------------------------------------------------------

 

1.线程池状态

 

ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;

static final int RUNNING    = 0;

static final int SHUTDOWN   = 1;

static final int STOP       = 2;

static final int TERMINATED = 3;

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

下面的几个static final变量表示runState可能的几个取值。

当创建线程池后,初始时,线程池处于RUNNING状态;

如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

 

------------------------------------------------------------------------------------------------------------------------------------

2.任务的执行

 

在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

 

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务

private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小

                                                              //、runState等)的改变都要使用这个锁

private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集

private volatile long  keepAliveTime;    //线程存活时间   

private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间

private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)

private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数

private volatile int   poolSize;       //线程池中当前的线程数

private volatile RejectedExecutionHandler handler; //任务拒绝策略

private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程

private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数

private long completedTaskCount;   //用来记录已经执行完毕的任务个数

 

 

如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

 

------------------------------------------------------------------------------------------------------------------------------------

 

3.线程池中的线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

prestartCoreThread():初始化一个核心线程;

prestartAllCoreThreads():初始化所有核心线程

 

4.任务缓存队列及排队策略

在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

 

 

5.任务拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

 

 

6.线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

 

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

 

7.线程池容量的动态调整

 

  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

 

    setCorePoolSize:设置核心池大小

    setMaximumPoolSize:设置线程池最大能创建的线程数目大小

 

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

 

 

----------------------------------------------------------------------------------------------------------------------------

例子:

public class Test {

     public static void main(String[] args) {   

         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,

                 new ArrayBlockingQueue<Runnable>(5));

          

         for(int i=0;i<15;i++){

             MyTask myTask = new MyTask(i);

             executor.execute(myTask);

             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+

             executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());

         }

         executor.shutdown();

     }

}

class MyTask implements Runnable {

    private int taskNum;

     

    public MyTask(int num) {

        this.taskNum = num;

    }

     

    @Override

    public void run() {

        System.out.println("正在执行task "+taskNum);

        try {

            Thread.currentThread().sleep(4000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("task "+taskNum+"执行完毕");

    }

}

 

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

 

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池

Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

 

下面是这三个静态方法的具体实现;

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue<Runnable>());

}

public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                                0L, TimeUnit.MILLISECONDS,

                                new LinkedBlockingQueue<Runnable>()));

}

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());

}

 

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

 

-------------如何合理配置线程池的大小-------------------------------------------------------------------------------------------------

一般需要根据任务的类型来配置线程池大小:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

 

一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)

如果是CPU密集型应用,则线程池大小设置为N+1。

如果是IO密集型应用,则线程池大小设置为2N+1。

如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。

但是,IO优化中,这样的估算公式可能更适合:最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目。

因为很显然,线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

下面举个例子:比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。

这个公式进一步转化为:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目。

 

import java.util.ArrayList;

import java.util.Random;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

/**

 * 线程池。

 * 对象池:一块内存区域,存放了一堆对象。用的时候从池中获取,使用完毕之后放回去。

 * 好处:避免反复的创建销毁对象。

 *

 * 线程池:避免反复的创建和销毁线程。简化对任务的操作。

 * jdk1.5 出现的。

 *

 * 线程池主要处理的任务类型有:Runnable 的命令 +  Callable 的任务。

 *

 */

public class ThreadPoolTest {

static int counter = 0;

 

public static void main(String[] args) throws InterruptedException, ExecutionException {

test2();

}

static void test1(){

//创建线程池对象。

//创建只有一个线程的线程池

// ExecutorService pool = Executors.newSingleThreadExecutor();

//创建指定数量的线程的线程池

// ExecutorService pool = Executors.newFixedThreadPool(10);

//创建线程数量可以自动增长的线程池

ExecutorService pool = Executors.newCachedThreadPool();

//给线程池的线程加任务

for(int i=0;i<10;i++){

pool.execute(new Runnable() {

@Override

public void run() {

System.out.println("任务开始了!--" + ++counter);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("任务结束了!--" + counter);

}

});

}

//关闭线程池

pool.shutdown();

}

static void test2() throws InterruptedException, ExecutionException{

//创建线程池对象。

//创建只有一个线程的线程池

ExecutorService pool = Executors.newSingleThreadExecutor();

//创建指定数量的线程的线程池

// ExecutorService pool = Executors.newFixedThreadPool(10);

//创建线程数量可以自动增长的线程池

// ExecutorService pool = Executors.newCachedThreadPool();

ArrayList<Future<Integer>> futures = new ArrayList<>();

//给线程池的线程加任务

for(int i=0;i<10;i++){

Future<Integer> future = pool.submit(new Callable<Integer>() {

Random random = new Random();

@Override

public Integer call() throws Exception {

System.out.println(Thread.currentThread().getName() + "任务开始了!");

return random.nextInt(100);

}

});

futures.add(future);

}

for (Future<Integer> future : futures) {

System.out.println(future.get());

}

//关闭线程池

pool.shutdown();

}

}

 

 

 

网友评论

登录后评论
0/500
评论
winniehu
+ 关注