Java并发(四)BlockingQueue的使用

简介:

    wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无届队列,你还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

    如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyAll()相比,则要简单并可靠许多。

    考虑下面这个BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import  java.util.Random;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.LinkedBlockingQueue;
import  java.util.concurrent.TimeUnit;
 
class  Toast {
     /**
      * 吐司的状态:
      * DRY: 烘干的
      * BUTTERED: 涂了黄油的
      * JAMMED: 涂了果酱的
      * <p>吐司的状态只能由DRY->BUTTERED->JAMMED转变
      */
     public  enum  Status {DRY, BUTTERED, JAMMED}
     private  Status status = Status.DRY; //默认状态为DRY
     private  final  int  id;
     public  Toast( int  id) {  this .id =  id;}
     public  void  butter() {status = Status.BUTTERED;}
     public  void  jam() {status = Status.JAMMED;}
     public  Status getStatus() { return  status;}
     public  int  getId() { return  id;}
     public  String toString() {
         return  "Toast id: "  + id +  ", status: "  + status;
     }
}
 
@SuppressWarnings ( "serial" )
class  ToastQueue  extends  LinkedBlockingQueue<Toast> {}
 
/**
  * 生产吐司的任务。
  */
class  Toaster  implements  Runnable {
     private  ToastQueue toastQueue;
     private  int  count =  0 ;
     private  Random random =  new  Random( 47 );
     public  Toaster(ToastQueue queue) {
         this .toastQueue = queue;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 TimeUnit.MILLISECONDS.sleep( 300  + random.nextInt( 500 ));
                 //生产一片吐司,这些吐司是有序的
                 Toast toast =  new  Toast(count++);
                 System.out.println(toast);
                 //放到toastQueue中
                 toastQueue.put(toast);
             }
         catch  (InterruptedException e) {
             System.out.println( "Toaster interrupted." );
         }
         System.out.println( "Toaster off." );
     }
}
 
/**
  * 涂黄油的任务。
  */
class  Butterer  implements  Runnable {
     private  ToastQueue dryQueue;
     private  ToastQueue butteredQueue;
     public  Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
         this .dryQueue = dryQueue;
         this .butteredQueue = butteredQueue;
     }
     
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //在取得下一个吐司之前会一直阻塞
                 Toast toast = dryQueue.take();
                 toast.butter();
                 System.out.println(toast);
                 butteredQueue.put(toast);
             }
         catch  (InterruptedException e) {
             System.out.println( "Butterer interrupted." );
         }
         System.out.println( "Butterer off." );
         
     }
}
 
/**
  * 涂果酱的任务。
  */
class  Jammer  implements  Runnable {
     private  ToastQueue butteredQueue;
     private  ToastQueue finishedQueue;
     public  Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
         this .finishedQueue = finishedQueue;
         this .butteredQueue = butteredQueue;
     }
     
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //在取得下一个吐司之前会一直阻塞
                 Toast toast = butteredQueue.take();
                 toast.jam();
                 System.out.println(toast);
                 finishedQueue.put(toast);
             }
         catch  (InterruptedException e) {
             System.out.println( "Jammer interrupted." );
         }
         System.out.println( "Jammer off." );
     }
}
 
/**
  * 吃吐司的人,消费者。
  */
class  Eater  implements  Runnable {
     private  ToastQueue finishedQueue;
     private  int  count =  0 ;
     public  Eater (ToastQueue finishedQueue) {
         this .finishedQueue = finishedQueue;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //在取得下一个吐司之前会一直阻塞
                 Toast toast = finishedQueue.take();
                 //验证取得的吐司是有序的,而且状态是JAMMED的
                 if  (toast.getId() != count++ || 
                         toast.getStatus() != Toast.Status.JAMMED) {
                     System.out.println( "Error -> "  + toast);
                     System.exit(- 1 );
                 else  {
                     //吃掉吐司
                     System.out.println(toast +  "->Eaten" );
                 }
             }
         catch  (InterruptedException e) {
             System.out.println( "Eater interrupted." );
         }
         System.out.println( "Eater off." );
     }
}
 
public  class  ToastOMatic {
     public  static  void  main(String[] args)  throws  Exception {
         ToastQueue dryQueue =  new  ToastQueue();
         ToastQueue butteredQueue =  new  ToastQueue();
         ToastQueue finishedQueue =  new  ToastQueue();
         ExecutorService exec = Executors.newCachedThreadPool();
         exec.execute( new  Toaster(dryQueue));
         exec.execute( new  Butterer(dryQueue, butteredQueue));
         exec.execute( new  Jammer(butteredQueue, finishedQueue));
         exec.execute( new  Eater(finishedQueue));
         TimeUnit.SECONDS.sleep( 5 );
         exec.shutdownNow();
     }
}

执行结果(可能的结果):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Toast id:  0 , status: DRY
Toast id:  0 , status: BUTTERED
Toast id:  0 , status: JAMMED
Toast id:  0 , status: JAMMED->Eaten
Toast id:  1 , status: DRY
Toast id:  1 , status: BUTTERED
Toast id:  1 , status: JAMMED
Toast id:  1 , status: JAMMED->Eaten
Toast id:  2 , status: DRY
Toast id:  2 , status: BUTTERED
Toast id:  2 , status: JAMMED
Toast id:  2 , status: JAMMED->Eaten
Toast id:  3 , status: DRY
Toast id:  3 , status: BUTTERED
Toast id:  3 , status: JAMMED
Toast id:  3 , status: JAMMED->Eaten
Toast id:  4 , status: DRY
Toast id:  4 , status: BUTTERED
Toast id:  4 , status: JAMMED
Toast id:  4 , status: JAMMED->Eaten
Toast id:  5 , status: DRY
Toast id:  5 , status: BUTTERED
Toast id:  5 , status: JAMMED
Toast id:  5 , status: JAMMED->Eaten
Toast id:  6 , status: DRY
Toast id:  6 , status: BUTTERED
Toast id:  6 , status: JAMMED
Toast id:  6 , status: JAMMED->Eaten
Toast id:  7 , status: DRY
Toast id:  7 , status: BUTTERED
Toast id:  7 , status: JAMMED
Toast id:  7 , status: JAMMED->Eaten
Eater interrupted.
Eater off.
Butterer interrupted.
Toaster interrupted.
Toaster off.
Jammer interrupted.
Jammer off.
Butterer off.

    Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。

目录
相关文章
|
1月前
|
存储 安全 算法
解读 Java 并发队列 BlockingQueue
解读 Java 并发队列 BlockingQueue
19 0
|
2月前
|
监控 安全 算法
Java并发基础:LinkedTransferQueue全面解析!
LinkedTransferQueue类实现了高效的线程间数据传递,支持等待匹配的生产者-消费者模式,基于链表的无界设计使其在高并发场景下表现卓越,且无需担心队列溢出,丰富的方法和良好的可扩展性满足了各种复杂应用场景的需求。
Java并发基础:LinkedTransferQueue全面解析!
|
2月前
|
Java 程序员 API
Java并发基础:concurrent Flow API全面解析
java.util.concurrent.Flow定义了响应式编程的核心接口,促进了Java在异步数据处理和背压机制方面的标准化,这使得第三方库如Reactor和RxJava能够基于这些接口提供丰富的实现和功能,同时简化了响应式编程在Java中的使用,Flow API增强了Java在并发编程领域的灵活性,使得处理异步数据流变得更加自然和高效。
108 0
Java并发基础:concurrent Flow API全面解析
|
1月前
|
存储 缓存 算法
Java并发基础:原子类之AtomicMarkableReference全面解析
AtomicMarkableReference类能够确保引用和布尔标记的原子性更新,有效避免了多线程环境下的竞态条件,其提供的方法可以轻松地实现基于条件的原子性操作,提高了程序的并发安全性和可靠性。
Java并发基础:原子类之AtomicMarkableReference全面解析
|
2天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
2 0
|
3天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
21 0
|
11天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
18天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
22天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
25 0
|
24天前
|
算法 安全 Java
Java中的并发编程:理解并发性能优化
在当今软件开发领域,多核处理器的普及使得并发编程变得更加重要。本文将深入探讨Java中的并发编程,介绍并发性能优化的关键技术,帮助开发人员更好地利用多核处理器提升应用程序性能。