1 Producer-Consumer Pattern

Producer-Consumer Pattern主要就是在生产者与消费者之间建立一个“桥梁参与者”,用来解决生产者线程与消费者线程之间速度的不匹配。

   当要从某个线程Produccer参与者将数据传输给其它线程Consumer参与者的时候,此时就可以在中间加一个Channel参与者,在Channel参与者中以某种方式存放接受的数据,再以某方式来获取收到的数据,Channel就可以来缓存两个线程之间传输的数据,在Channel参与者为了保证安全性,也要用Guarded Suspension Pattern模式。

   Channel参与者作为一个中间者,当Channel参与者从Producer参与者接收到数据,可以用三种方式将数据按顺序 传递给Consumer参与者。

1 队列,这是一种按照FIFO的方式存储数据,即最先到达的数据最先传输给Consumer参与者。在Java中,可以利用数组形式来存放,每次从数组下标最前端获取数据,而从数组下标最后端来缓存数据。也可以利用LinkedList来存放,每次缓存数据的时候,利用LinkedList.addLast(obj),每次获取数据的时候利用LinkedList.removeFirst();移除并且返回队列的第一个元素。

2 堆栈,这是一种以LIFO的方式存储数据,即最先到达的数据最后传输给Consumer参与者。在Java中,对于堆栈的实现,可以直接使用LinkedList,利用pop()从栈顶弹出一个数据来获取数据,利用push(obj)来向堆栈中缓存一个数据

  3 优先级队列,对于缓存的数据设置一些优先级来存储。

  生产者与消费者模式,其实就是线程之间的合作关系,同时又包含了互斥关系。所谓的合作就是生产者生成产品,提供消费者消费。所谓的互斥就是生产者和消费者对于中间的缓冲区是互斥访问的。

实例:

   几个厨师制作食物,将物品放置在桌子上,但是桌子放置的盘子有限,消费者可以从桌子上获取食物来吃。当桌子上有空位置的时候,厨师就可以继续放置做好的食物,且通知消费者来吃,但是满了就只能一直等待消费者吃了有空的位置。而消费者每次取食物的时候,如果桌子上面有食物,则就取走,并且通知厨师来做食物,如果没有则就等待。

生产者Producer代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package  whut.producer;
import  java.util.Random;
public  class  MakerThread  extends  Thread{
     private  final  Table table;
     private  final  Random random;
     private  static  int  id= 0 ;
     public  MakerThread(String name,Table table, long  seed)
     {
         super (name);
         this .table=table;
         this .random= new  Random(seed);
     }
                                                                        
     public  void  run()
     {
         try {
             while ( true )
             {
                 Thread.sleep(random.nextInt( 1000 ));
                 String cake= " [Cake No." +nextId()+ " by " +Thread.currentThread().getName()+ "]" ;
                 table.put(cake);
                                                                                    
             }
         } catch (InterruptedException e)
         {
         }
     }
     //为了使得所有实例共享该字段
     public  static  synchronized  int  nextId()
     {
         return  id++;
     }
}

消费者Consumer代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package  whut.producer;
import  java.util.Random;
public  class  EaterThread  extends  Thread{
     private  final  Table table;
     private  final  Random random;
     public  EaterThread(String name,Table table, long  seed)
     {
         super (name);
         this .table=table;
         this .random= new  Random(seed);
     }
     public  void  run()
     {
         try {
             while ( true )
             {
                 String cake=table.take();
                 Thread.sleep(random.nextInt( 1000 ));
             }
         } catch (InterruptedException e)
         {
         }
     }
}

Channel中间缓冲区,关键部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package  whut.producer;
public  class  Table {
private  final  String[] cakes; //利用数组来作为缓冲区
     private  int  head; //下一次蛋糕取的位置
     private  int  tail; //下一次蛋糕放置位置
     private  int  count; //桌子上蛋糕的总数
     public  Table( int  count)
     {
         this .cakes= new  String[count];
         this .head= 0 ;
         this .tail= 0 ;
         this .count= 0 ;
     }
                           
     public  synchronized  void  put(String cake) throws  InterruptedException
     {
         System.out.println(Thread.currentThread().getName()+ " puts " +cake);
         while (count>=cakes.length)
         {
             System.out.println(Thread.currentThread().getName()+ " Begin wait...." );
             wait();
             System.out.println(Thread.currentThread().getName()+ " End wait...." );
         }
         cakes[tail]=cake;
         tail=(tail+ 1 )%cakes.length;
         count++;
         notifyAll();
     }
                           
     //取蛋糕
     public  synchronized  String take() throws  InterruptedException
     {
         while (count<= 0 )
         {
             System.out.println(Thread.currentThread().getName()+ " Begin wait...." );
             wait();
             System.out.println(Thread.currentThread().getName()+ " End wait...." );
         }
         String cake=cakes[head];
         head=(head+ 1 )%cakes.length;
         count--;
         notifyAll();
         System.out.println(Thread.currentThread().getName()+ " gets " +cake);
         return  cake;
     }
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package  whut.producer;
public  class  MainTest {
     public  static  void  main(String[] args) {
         // TODO Auto-generated method stub
         Table table= new  Table( 3 );
         new  MakerThread( "MakerThread-1" ,table, 31415 ).start();
         new  MakerThread( "MakerThread-2" ,table, 92653 ).start();
         new  MakerThread( "MakerThread-3" ,table, 58979 ).start();
                         
         new  EaterThread( "EaterThread-1" ,table, 32384 ).start();
         new  EaterThread( "EaterThread-2" ,table, 32384 ).start();
         new  EaterThread( "EaterThread-3" ,table, 32384 ).start();
         //可以通过调用interrupt来去中断结束任何线程
     }
}