多线程之间经常需要协同工作,最常见的方式是使用Guarded Blocks,它循环检查一个条件(通常初始值为true),直到条件发生变化才跳出循环继续执行。在使用Guarded Blocks时有以下几个步骤需要注意:
假设guardedJoy()方法必须要等待另一线程为共享变量joy设值才能继续执行。那么理论上可以用一个简单的条件循环来实现,但在等待过程中guardedJoy方法不停的检查循环条件实际上是一种资源浪费。
1 |
public void guardedJoy() { |
2 |
// Simple loop guard. Wastes |
3 |
// processor time. Don't do this! |
4 |
while (!joy) {} |
5 |
System.out.println( "Joy has been achieved!" ); |
6 |
} |
更加高效的方法是调用Object.wait将当前线程挂起,直到有另一线程发起事件通知(尽管通知的事件不一定是当前线程等待的事件)。
01 |
public synchronized void guardedJoy() { |
02 |
// This guard only loops once for each special event, which may not |
03 |
// be the event we're waiting for. |
04 |
while (!joy) { |
05 |
try { |
06 |
wait(); |
07 |
} catch (InterruptedException e) {} |
08 |
} |
09 |
System.out.println( "Joy and efficiency have been achieved!" ); |
10 |
} |
注意:一定要在循环里面调用wait方法,不要想当然的认为线程唤醒后循环条件一定发生了改变。
和其他可以暂停线程执行的方法一样,wait方法会抛出InterruptedException,在上面的例子中,因为我们关心的是joy的值,所以忽略了InterruptedException。
为什么guardedJoy是synchronized方法?假设d是用来调用wait的对象,当一个线程调用d.wait,它必须要拥有d的内部锁(否则会抛出异常),获得d的内部锁的最简单方法是在一个synchronized方法里面调用wait。
当一个线程调用wait方法时,它释放锁并挂起。然后另一个线程请求并获得这个锁并调用Object.notifyAll通知所有等待该锁的线程。
1 |
public synchronized notifyJoy() { |
2 |
joy = true ; |
3 |
notifyAll(); |
4 |
} |
当第二个线程释放这个该锁后,第一个线程再次请求该锁,从wait方法返回并继续执行。
注意:还有另外一个通知方法,notify(),它只会唤醒一个线程。但由于它并不允许指定哪一个线程被唤醒,所以一般只在大规模并发应用(即系统有大量相似任务的线程)中使用。因为对于大规模并发应用,我们其实并不关心哪一个线程被唤醒。
现在我们使用Guarded blocks创建一个生产者/消费者应用。这类应用需要在两个线程之间共享数据:生产者生产数据,消费者使用数据。两个线程通过共享对象通信。在这里,线程协同工作的关键是:生产者发布数据之前,消费者不能够去读取数据;消费者没有读取旧数据前,生产者不能发布新数据。
在下面的例子中,数据通过Drop对象共享的一系列文本消息:
01 |
public class Drop { |
02 |
// Message sent from producer |
03 |
// to consumer. |
04 |
private String message; |
05 |
// True if consumer should wait |
06 |
// for producer to send message, |
07 |
// false if producer should wait for |
08 |
// consumer to retrieve message. |
09 |
private boolean empty = true ; |
10 |
11 |
public synchronized String take() { |
12 |
// Wait until message is |
13 |
// available. |
14 |
while (empty) { |
15 |
try { |
16 |
wait(); |
17 |
} catch (InterruptedException e) {} |
18 |
} |
19 |
// Toggle status. |
20 |
empty = true ; |
21 |
// Notify producer that |
22 |
// status has changed. |
23 |
notifyAll(); |
24 |
return message; |
25 |
} |
26 |
27 |
public synchronized void put(String message) { |
28 |
// Wait until message has |
29 |
// been retrieved. |
30 |
while (!empty) { |
31 |
try { |
32 |
wait(); |
33 |
} catch (InterruptedException e) {} |
34 |
} |
35 |
// Toggle status. |
36 |
empty = false ; |
37 |
// Store message. |
38 |
this .message = message; |
39 |
// Notify consumer that status |
40 |
// has changed. |
41 |
notifyAll(); |
42 |
} |
43 |
} |
Producer是生产者线程,发送一组消息,字符串DONE表示所有消息都已经发送完成。为了模拟现实情况,生产者线程还会在消息发送时随机的暂停。
01 |
import java.util.Random; |
02 |
03 |
public class Producer implements Runnable { |
04 |
private Drop drop; |
05 |
06 |
public Producer(Drop drop) { |
07 |
this .drop = drop; |
08 |
} |
09 |
10 |
public void run() { |
11 |
String importantInfo[] = { |
12 |
"Mares eat oats" , |
13 |
"Does eat oats" , |
14 |
"Little lambs eat ivy" , |
15 |
"A kid will eat ivy too" |
16 |
}; |
17 |
Random random = new Random(); |
18 |
19 |
for ( int i = 0 ; |
20 |
i < importantInfo.length; |
21 |
i++) { |
22 |
drop.put(importantInfo[i]); |
23 |
try { |
24 |
Thread.sleep(random.nextInt( 5000 )); |
25 |
} catch (InterruptedException e) {} |
26 |
} |
27 |
drop.put( "DONE" ); |
28 |
} |
29 |
} |
Consumer是消费者线程,读取消息并打印出来,直到读取到字符串DONE为止。消费者线程在消息读取时也会随机的暂停。
01 |
import java.util.Random; |
02 |
03 |
public class Consumer implements Runnable { |
04 |
private Drop drop; |
05 |
06 |
public Consumer(Drop drop) { |
07 |
this .drop = drop; |
08 |
} |
09 |
10 |
public void run() { |
11 |
Random random = new Random(); |
12 |
for (String message = drop.take(); |
13 |
! message.equals( "DONE" ); |
14 |
message = drop.take()) { |
15 |
System.out.format( "MESSAGE RECEIVED: %s%n" , message); |
16 |
try { |
17 |
Thread.sleep(random.nextInt( 5000 )); |
18 |
} catch (InterruptedException e) {} |
19 |
} |
20 |
} |
21 |
} |
ProducerConsumerExample是主线程,它启动生产者线程和消费者线程。
1 |
public class ProducerConsumerExample { |
2 |
public static void main(String[] args) { |
3 |
Drop drop = new Drop(); |
4 |
( new Thread( new Producer(drop))).start(); |
5 |
( new Thread( new Consumer(drop))).start(); |
6 |
} |
7 |
} |
注意:Drop类是用来演示Guarded Blocks如何工作的。为了避免重新发明轮子,当你尝试创建自己的共享数据对象时,请查看Java Collections Framework中已有的数据结构。如需更多信息,请参考Questions and Exercises。