您的位置:首页 > 其它

线程通信之生产者消费者模型

2018-01-29 18:22 260 查看

  线程通信,是指线程之间的消息传递。

  多个线程在操作同一个资源时,它们对共享资源的操作动作可能不同;它们共享同一个资源,互为条件,相互依赖,相互通信,从而让任务向前推进。

  另外,在线程的同步策略中,虽然可以解决并发更新同一个资源,保障资源的安全,但不能用来实现线程间的消息传递。因此,线程通信与线程同步往往会融合使用。

  生产者消费者模型堪称是线程通信中的一个典型案例,我们接下来通过生产者消费者模型来进一步认识线程通信。在此,我们先对若干概念进行了解。  

  生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费

  消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费

  线程通信:使用java中超类Object中提供的一些方法:

1 public final void wait();  //注:long timeout=0  表示线程一直等待,直到其它线程通知
2 public final native void wait(long timeout);   //线程等待指定毫秒参数的时间,超过该时间则不再等待
3 public final void wait(long timeout, int nanos);  /*线程等待指定毫秒、微妙的时间,timeout最大等待时间,
                                以毫秒为单位,nanos额外的时间,在纳秒范围0-999999*/
4 public final native void notify(); //唤醒一个处于等待状态的线程 5 public final native void notifyAll(); //唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行

  需要注意的是,上述方法只能在同步方法或者同步代码块中使用,否则会抛出异常。

  接下来,我们以生产A-D个产品,放入仓库,待消费者消费后,生产者再进行生产为例,看下生产者消费者模式的运行流程。  

1 /**
2  * 1.共享资源缓存和操作类
3  */
4 public class SharedCache {
5     //产品,此处使用char字符,作为存储共享数据的数据类型
6     private char cache;
7     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
8     private boolean flag=false;
9     /*
10     生产操作(生产者):向仓库中添加共享数据
11      */
12     public synchronized void addSharedCacheData(char data){
13         //产品未消费,则生产者的生产操作等待
14         if(flag){
15             System.out.println("产品未消费,生产者的生产操作等待");
16             try {
17                 //生产者等待
18                 wait();
19             } catch (InterruptedException e) {
20                 System.out.println("Thread interrupted Exception:"+e.getMessage());
21             }
22         }
23         //产品已消费,则生产者继续生产
24         this.cache=data;
25         //标记已生产
26         flag=true;
27         //通知消费者已生产
28         notify();
29         System.out.println("生产者--->产品:"+data+"已生产,等待消费者消费");
30     }
31     /*
32     消费操作(消费者):向仓库中获取共享数据
33      */
34     public synchronized char getSharedCacheData(){
35         //如果产品未生产,则消费者等待
36         if(!flag){
37             System.out.println("产品未生产,消费者的消费操作等待");
38             try {
39                 wait();
40             } catch (InterruptedException e) {
41                 System.out.println("Thread interrupted Exception:"+e.getMessage());
42             }
43         }
44         //标记已消费
45         flag=false;
46         //通知生产者已消费
47         notify();
48         System.out.println("消费者--->产品:"+this.cache+"已消费,通知生产者生产");
49         return this.cache;
50     }
51 }
52 /**
53  * 2.生产者线程类
54  */
55 public class Producer extends Thread{
56     //共享缓存资源类的对象
57     private SharedCache cache;
58     //构造器,传入共享资源类的对象
59     public Producer(SharedCache cache){
60         this.cache=cache;
61     }
62     /*
63     生产者生产产品,放入共享资源缓存类(相当于将生产的产品放入仓库里)
64     生产A-D类型的产品
65      */
66     @Override
67     public void run() {
68         for(char product='A';product<='D';product++){
69             try {
70                 sleep((int)(Math.random()*3000));
71             } catch (InterruptedException e) {
72                 System.out.println("Thread interrupted Exception:"+e.getMessage());
73             }
74             //生产产品,放入共享缓存数据类的对象里(相当于把生产的产品放到仓库里)
75             cache.addSharedCacheData(product);
76         }
77     }
78 }
79 /**
80  * 3.消费者线程类
81  */
82 public class Consumer extends Thread{
83     //共享缓存资源类的对象
84     private SharedCache cache;
85     //构造器,传入共享资源类的对象
86     public Consumer(SharedCache cache){
87         this.cache=cache;
88     }
89     /*
90     消费者消费产品,获取共享缓存类的对象里的数据(相当于从仓库里提取产品)
91     当消费到D类型的产品时即停止消费
92      */
93     @Override
94     public void run() {
95         char product='A';
96         do{
97             try {
98                 Thread.sleep((int)(Math.random()*3000));
99             } catch (InterruptedException e) {
100                 System.out.println("Thread interrupted Exception:"+e.getMessage());
101             }
102             //消费,从仓库取走商品
103             product=cache.getSharedCacheData();
104         }while (product!='D');
105     }
106 }
107 /**
108  * 4.线程通信测试类
109  */
110 public class Test {
111     public static void main(String[] args) {
112         //生产者与消费者共享同一个资源
113         SharedCache cache = new SharedCache();
114         //启动消费者线程
115         new Consumer(cache).start();
116         //启动生产者线程
117         new Producer(cache).start();
118     }
119 }

  运行上述的测试类后,执行结果如下:

产品未生产,消费者的消费操作等待
生产者--->产品:A已生产,等待消费者消费
消费者--->产品:A已消费,通知生产者生产
生产者--->产品:B已生产,等待消费者消费
消费者--->产品:B已消费,通知生产者生产
生产者--->产品:C已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者--->产品:C已消费,通知生产者生产
生产者--->产品:D已生产,等待消费者消费
消费者--->产品:D已消费,通知生产者生产

  我们在上面完成的生产者消费者模型,在处理线程同步问题时,主要是用了synchronized同步方法,JDK 1.5提供了多线程升级方案,将同步synchronized替换成了显式的Lock操作,可以实现唤醒、冻结指定的线程。

  接口Lock的实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。Lock 可以支持多个相关的 Condition 对象,从而在使用中更加灵活。

  接口Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。该对象可以通过Lock锁进行获取。可以说,传统线程的通信方式,Condition都可以实现。

  需要注意的是,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。  

  Java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,从而为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。

  其中,ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,还添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。

  我们接下来通过ReentrantLock 类和Condition接口的实现类来完成一个生产者消费者模型。为此,我们需要创建一个ReentrantLock类的多态对象,即建立一把锁,然后将这把锁与两个Condition对象关联。我们接下来就用Lock与Condition实现一个生产者消费者模型,实现与上述例子相似的效果,代码具体如下:  

1 import java.util.concurrent.locks.Condition;
2 import java.util.concurrent.locks.Lock;
3 import java.util.concurrent.locks.ReentrantLock;
4 /**
5  * 共享的资源
6  */
7 public class Resource {
8     private char product;
9     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
10     private boolean flag = false;
11     //定义一个实现Lock接口的ReentrantLock类对象
12     private Lock lock = new ReentrantLock();
13     /*
14     Condition是被绑定到Lock上的,
15     要创建一个Lock的Condition,
16     必须用Lock对象的newCondition()方法
17      */
18     private Condition cond_pro = lock.newCondition();
19     //一个lock可以有多个相关的condition
20     private Condition cond_con = lock.newCondition();
21     /*
22         定义生产方法
23      */
24     public void produce(char product) throws InterruptedException {
25         lock.lock();//手动加同步锁
26         try {
27             while (flag) {//此时若生产完一个以后唤醒了另一个生产者,则再次判断,避免两个生产者同时生产
28                 System.out.println("产品未消费,生产者的生产操作等待");
29                 cond_pro.await();
30             }
31             this.product = product;
32             //标记已生产
33             flag = true;
34             //通知消费者已生产
35             cond_con.signal();//唤醒消费方法,利用了condition的signal()指定唤醒对象
36             System.out.println("生产者"+Thread.currentThread().getName()+"--->产品:"+product+"已生产,等待消费者消费");
37         } finally {
38             lock.unlock();//释放锁
39         }
40     }
41     /*
42         定义消费方法
43      */
44     public char consume() throws InterruptedException {
45         lock.lock();
46         try {
47             while (!flag) {
48                 System.out.println("产品未生产,消费者的消费操作等待");
49                 cond_con.await();
50             }
51             //标记已消费
52             flag = false;
53             //通知生产者已消费
54             cond_pro.signal();
55             System.out.println("消费者"+Thread.currentThread().getName()+"--->产品:"+this.product+"已消费,通知生产者生产");
56             return this.product;
57         } finally {
58             lock.unlock();
59         }
60     }
61 }
62 /**
63  * 生产者
64  */
65 public class Producer implements Runnable{
66     private Resource res;
67     public Producer(Resource res){
68         this.res=res;
69     }
70     @Override
71     public void run() {
72         for(char product='A';product<='D';product++){
73             try {
74                 res.produce(product);
75             } catch (InterruptedException e) {
76                 e.printStackTrace();
77             }
78         }
79     }
80 }
81 /**
82  * 消费者
83  */
84 public class Consumer implements Runnable{
85     private Resource res;
86     public Consumer(Resource res){
87         this.res=res;
88     }
89     @Override
90     public void run() {
91         char product='A';
92         do{
93             try {
94                 product=res.consume();
95             } catch (InterruptedException e) {
96                 e.printStackTrace();
97             }
98         }while(product!='D');
99     }
100 }
101 /**
102  * 用ReentrantLock和Condition实现生产者消费者模型
103  */
104 public class Test {
105     //入口方法
106     public static void main(String[] args) {
107         Resource res = new Resource();//生产者与消费者共享的资源
108         Producer producer = new Producer(res);//生产者
109         Consumer consumer = new Consumer(res);//消费者
110         //生产者线程与消费者线程各创建两个
111         Thread p1 = new Thread(producer);
112         Thread p2 = new Thread(producer);
113         Thread c1 = new Thread(consumer);
114         Thread c2 = new Thread(consumer);
115         p1.start();
116         p2.start();
117         c1.start();
118         c2.start();
119     }
120 }

  上述代码执行结果如下:

生产者Thread-0--->产品:A已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:A已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:A已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:A已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-0--->产品:B已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-3--->产品:B已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:B已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:B已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-0--->产品:C已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-3--->产品:C已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:C已生产,等待消费者消费
产品未消费,生产者的生产操作等待
消费者Thread-2--->产品:C已消费,通知生产者生产
生产者Thread-0--->产品:D已生产,等待消费者消费
消费者Thread-3--->产品:D已消费,通知生产者生产
产品未生产,消费者的消费操作等待
生产者Thread-1--->产品:D已生产,等待消费者消费
消费者Thread-3--->产品:D已消费,通知生产者生产  

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: