线程通信之生产者消费者模型
线程通信,是指线程之间的消息传递。
多个线程在操作同一个资源时,它们对共享资源的操作动作可能不同;它们共享同一个资源,互为条件,相互依赖,相互通信,从而让任务向前推进。
另外,在线程的同步策略中,虽然可以解决并发更新同一个资源,保障资源的安全,但不能用来实现线程间的消息传递。因此,线程通信与线程同步往往会融合使用。
生产者消费者模型堪称是线程通信中的一个典型案例,我们接下来通过生产者消费者模型来进一步认识线程通信。在此,我们先对若干概念进行了解。
生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费
消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费
线程通信:使用java中超类Object中提供的一些方法:
1 public final void wait(); //注:long timeout=0 表示线程一直等待,直到其它线程通知 2 public final native void wait(long timeout); //线程等待指定毫秒参数的时间,超过该时间则不再等待 3 public final void wait(long timeout, int nanos); /*线程等待指定毫秒、微妙的时间,timeout最大等待时间,
以毫秒为单位,nanos额外的时间,在纳秒范围0-999999*/
4 public final native void notify(); //唤醒一个处于等待状态的线程 5 public final native void notifyAll(); //唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行
需要注意的是,上述方法只能在同步方法或者同步代码块中使用,否则会抛出异常。
接下来,我们以生产A-D个产品,放入仓库,待消费者消费后,生产者再进行生产为例,看下生产者消费者模式的运行流程。
1 /** 2 * 1.共享资源缓存和操作类 3 */ 4 public class SharedCache { 5 //产品,此处使用char字符,作为存储共享数据的数据类型 6 private char cache; 7 //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费) 8 private boolean flag=false; 9 /* 10 生产操作(生产者):向仓库中添加共享数据 11 */ 12 public synchronized void addSharedCacheData(char data){ 13 //产品未消费,则生产者的生产操作等待 14 if(flag){ 15 System.out.println("产品未消费,生产者的生产操作等待"); 16 try { 17 //生产者等待 18 wait(); 19 } catch (InterruptedException e) { 20 System.out.println("Thread interrupted Exception:"+e.getMessage()); 21 } 22 } 23 //产品已消费,则生产者继续生产 24 this.cache=data; 25 //标记已生产 26 flag=true; 27 //通知消费者已生产 28 notify(); 29 System.out.println("生产者--->产品:"+data+"已生产,等待消费者消费"); 30 } 31 /* 32 消费操作(消费者):向仓库中获取共享数据 33 */ 34 public synchronized char getSharedCacheData(){ 35 //如果产品未生产,则消费者等待 36 if(!flag){ 37 System.out.println("产品未生产,消费者的消费操作等待"); 38 try { 39 wait(); 40 } catch (InterruptedException e) { 41 System.out.println("Thread interrupted Exception:"+e.getMessage()); 42 } 43 } 44 //标记已消费 45 flag=false; 46 //通知生产者已消费 47 notify(); 48 System.out.println("消费者--->产品:"+this.cache+"已消费,通知生产者生产"); 49 return this.cache; 50 } 51 } 52 /** 53 * 2.生产者线程类 54 */ 55 public class Producer extends Thread{ 56 //共享缓存资源类的对象 57 private SharedCache cache; 58 //构造器,传入共享资源类的对象 59 public Producer(SharedCache cache){ 60 this.cache=cache; 61 } 62 /* 63 生产者生产产品,放入共享资源缓存类(相当于将生产的产品放入仓库里) 64 生产A-D类型的产品 65 */ 66 @Override 67 public void run() { 68 for(char product='A';product<='D';product++){ 69 try { 70 sleep((int)(Math.random()*3000)); 71 } catch (InterruptedException e) { 72 System.out.println("Thread interrupted Exception:"+e.getMessage()); 73 } 74 //生产产品,放入共享缓存数据类的对象里(相当于把生产的产品放到仓库里) 75 cache.addSharedCacheData(product); 76 } 77 } 78 } 79 /** 80 * 3.消费者线程类 81 */ 82 public class Consumer extends Thread{ 83 //共享缓存资源类的对象 84 private SharedCache cache; 85 //构造器,传入共享资源类的对象 86 public Consumer(SharedCache cache){ 87 this.cache=cache; 88 } 89 /* 90 消费者消费产品,获取共享缓存类的对象里的数据(相当于从仓库里提取产品) 91 当消费到D类型的产品时即停止消费 92 */ 93 @Override 94 public void run() { 95 char product='A'; 96 do{ 97 try { 98 Thread.sleep((int)(Math.random()*3000)); 99 } catch (InterruptedException e) { 100 System.out.println("Thread interrupted Exception:"+e.getMessage()); 101 } 102 //消费,从仓库取走商品 103 product=cache.getSharedCacheData(); 104 }while (product!='D'); 105 } 106 } 107 /** 108 * 4.线程通信测试类 109 */ 110 public class Test { 111 public static void main(String[] args) { 112 //生产者与消费者共享同一个资源 113 SharedCache cache = new SharedCache(); 114 //启动消费者线程 115 new Consumer(cache).start(); 116 //启动生产者线程 117 new Producer(cache).start(); 118 } 119 }
运行上述的测试类后,执行结果如下:
产品未生产,消费者的消费操作等待 生产者--->产品:A已生产,等待消费者消费 消费者--->产品:A已消费,通知生产者生产 生产者--->产品:B已生产,等待消费者消费 消费者--->产品:B已消费,通知生产者生产 生产者--->产品:C已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者--->产品:C已消费,通知生产者生产 生产者--->产品:D已生产,等待消费者消费 消费者--->产品:D已消费,通知生产者生产
我们在上面完成的生产者消费者模型,在处理线程同步问题时,主要是用了synchronized同步方法,JDK 1.5提供了多线程升级方案,将同步synchronized替换成了显式的Lock操作,可以实现唤醒、冻结指定的线程。
接口Lock的实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。Lock 可以支持多个相关的 Condition 对象,从而在使用中更加灵活。
接口Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。该对象可以通过Lock锁进行获取。可以说,传统线程的通信方式,Condition都可以实现。
需要注意的是,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。
Java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,从而为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。
其中,ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,还添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。
我们接下来通过ReentrantLock 类和Condition接口的实现类来完成一个生产者消费者模型。为此,我们需要创建一个ReentrantLock类的多态对象,即建立一把锁,然后将这把锁与两个Condition对象关联。我们接下来就用Lock与Condition实现一个生产者消费者模型,实现与上述例子相似的效果,代码具体如下:
1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 /** 5 * 共享的资源 6 */ 7 public class Resource { 8 private char product; 9 //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费) 10 private boolean flag = false; 11 //定义一个实现Lock接口的ReentrantLock类对象 12 private Lock lock = new ReentrantLock(); 13 /* 14 Condition是被绑定到Lock上的, 15 要创建一个Lock的Condition, 16 必须用Lock对象的newCondition()方法 17 */ 18 private Condition cond_pro = lock.newCondition(); 19 //一个lock可以有多个相关的condition 20 private Condition cond_con = lock.newCondition(); 21 /* 22 定义生产方法 23 */ 24 public void produce(char product) throws InterruptedException { 25 lock.lock();//手动加同步锁 26 try { 27 while (flag) {//此时若生产完一个以后唤醒了另一个生产者,则再次判断,避免两个生产者同时生产 28 System.out.println("产品未消费,生产者的生产操作等待"); 29 cond_pro.await(); 30 } 31 this.product = product; 32 //标记已生产 33 flag = true; 34 //通知消费者已生产 35 cond_con.signal();//唤醒消费方法,利用了condition的signal()指定唤醒对象 36 System.out.println("生产者"+Thread.currentThread().getName()+"--->产品:"+product+"已生产,等待消费者消费"); 37 } finally { 38 lock.unlock();//释放锁 39 } 40 } 41 /* 42 定义消费方法 43 */ 44 public char consume() throws InterruptedException { 45 lock.lock(); 46 try { 47 while (!flag) { 48 System.out.println("产品未生产,消费者的消费操作等待"); 49 cond_con.await(); 50 } 51 //标记已消费 52 flag = false; 53 //通知生产者已消费 54 cond_pro.signal(); 55 System.out.println("消费者"+Thread.currentThread().getName()+"--->产品:"+this.product+"已消费,通知生产者生产"); 56 return this.product; 57 } finally { 58 lock.unlock(); 59 } 60 } 61 } 62 /** 63 * 生产者 64 */ 65 public class Producer implements Runnable{ 66 private Resource res; 67 public Producer(Resource res){ 68 this.res=res; 69 } 70 @Override 71 public void run() { 72 for(char product='A';product<='D';product++){ 73 try { 74 res.produce(product); 75 } catch (InterruptedException e) { 76 e.printStackTrace(); 77 } 78 } 79 } 80 } 81 /** 82 * 消费者 83 */ 84 public class Consumer implements Runnable{ 85 private Resource res; 86 public Consumer(Resource res){ 87 this.res=res; 88 } 89 @Override 90 public void run() { 91 char product='A'; 92 do{ 93 try { 94 product=res.consume(); 95 } catch (InterruptedException e) { 96 e.printStackTrace(); 97 } 98 }while(product!='D'); 99 } 100 } 101 /** 102 * 用ReentrantLock和Condition实现生产者消费者模型 103 */ 104 public class Test { 105 //入口方法 106 public static void main(String[] args) { 107 Resource res = new Resource();//生产者与消费者共享的资源 108 Producer producer = new Producer(res);//生产者 109 Consumer consumer = new Consumer(res);//消费者 110 //生产者线程与消费者线程各创建两个 111 Thread p1 = new Thread(producer); 112 Thread p2 = new Thread(producer); 113 Thread c1 = new Thread(consumer); 114 Thread c2 = new Thread(consumer); 115 p1.start(); 116 p2.start(); 117 c1.start(); 118 c2.start(); 119 } 120 }
上述代码执行结果如下:
生产者Thread-0--->产品:A已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:A已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:A已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:A已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-0--->产品:B已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-3--->产品:B已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:B已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:B已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-0--->产品:C已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-3--->产品:C已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:C已生产,等待消费者消费 产品未消费,生产者的生产操作等待 消费者Thread-2--->产品:C已消费,通知生产者生产 生产者Thread-0--->产品:D已生产,等待消费者消费 消费者Thread-3--->产品:D已消费,通知生产者生产 产品未生产,消费者的消费操作等待 生产者Thread-1--->产品:D已生产,等待消费者消费 消费者Thread-3--->产品:D已消费,通知生产者生产
- Java线程间通信问题分析(生产者消费者模型)
- 多线程执行多任务(线程通信):生产者消费者模型(一)
- java: 线程间通信经典模型“生产者-消费者”模型的实现
- 多线程执行多任务(线程通信):生产者消费者模型(二)
- 第23章 java线程通信——生产者/消费者模型案例
- JAVA 线程通信以及多个生产者消费者模型
- java线程间通信[实现不同线程之间的消息传递(通信),生产者和消费者模型]
- 从Java多线程实现“生产者-消费者”模型来谈谈操作系统中线程状态的转换
- Java基础学习6_多线程(线程间通信--生产者消费者)
- 线程之间的通信之消费者与生产者
- Linux线程--生产者消费者模型
- 死锁和线程的通信,生产者和消费者
- 【java线程系列】java线程系列之线程间的交互wait()/notify()/notifyAll()及生产者与消费者模型
- java线程间通信:生产者--消费者
- Java线程:并发协作-生产者消费者模型
- 基于线程实现的生产者消费者模型(Object.wait(),Object.notify()方法)
- 线程通信之生产者与消费者--小码哥java
- Java 多线程(二)线程间的通信应用--生产者消费者(未完)
- 【Java】线程wait() notify()通信 实现生产者 消费者问题
- 生产者与消费者模型(线程)