生产者&消费者模型-线程间协调
2017-02-16 11:36
295 查看
本文目录
本文目录开篇明志
Lock-Condition协作
生产者消费者模型队列模式
1 Buffer缓冲区
2 ProducerTask 生产线程
3 ConsumerTask 消费线程
运行效果
0. 开篇明志
通过生产者、消费者模型, 演示线程的协调。1. Lock-Condition协作
给小汽车凃蜡, 在凃蜡之前不能抛光, 意思就是: 凃蜡、抛光是按顺序执行的, 只能先凃蜡再抛光, 再继续循环上面的这个过程。import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; class Car{ private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean waxOn = false; public void waxed(){ lock.lock(); try{ waxOn = true;//已经凃蜡 condition.signalAll(); }finally{ lock.unlock(); } } public void buffed(){ lock.lock(); try{ waxOn = false;// 抛光完成 condition.signalAll(); }finally{ lock.unlock(); } } //等待凃蜡 public void waitForWaxing() throws InterruptedException{ lock.lock(); try{ while(waxOn == false)//如果没有凃蜡 则等待 condition.await(); }finally{ lock.unlock(); } } //等待抛光 public void waitForBuffing() throws InterruptedException{ lock.lock(); try{ while(waxOn == true) condition.await(); }finally{ lock.unlock(); } } } class WaxOn implements Runnable{ private Car car; public WaxOn(Car car){ this.car = car; } public void run(){ try{ while(!Thread.interrupted()){ System.out.println("Wax on!"); Thread.sleep(200); car.waxed(); car.waitForBuffing(); } }catch(InterruptedException e){ System.out.println("WaxOn Interrupted!"); } } } class WaxOff implements Runnable{ private Car car; public WaxOff(Car car){ this.car = car; } public void run(){ try{ while(!Thread.interrupted()){ car.waitForWaxing(); System.out.println("WaxOff! "); Thread.sleep(200); car.buffed(); } }catch(InterruptedException e){ System.out.println("WaxOff Interrupted!"); } } } public class Wax { public static void main(String[] args){ Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOn(car)); exec.execute(new WaxOff(car)); try{ Thread.sleep(3000); }catch(InterruptedException e){ } exec.shutdownNow(); } }
输出结果:
Wax on! WaxOff! Wax on! WaxOff! Wax on! WaxOff! Wax on! WaxOff! Wax on! WaxOff! Wax on! WaxOff! Wax on! WaxOff! Wax on! WaxOn Interrupted! WaxOff Interrupted!
2. 生产者消费者模型队列模式
2.1 Buffer缓冲区
假设使用缓冲区存储整数。缓存区的大小是受限的。缓存区提供write(int)方法将一个int值添加到缓冲区中, 还提供放大read()从缓冲区中读取和删除一个Int值。为了同步这个操作, 使用具有两个条件的锁: notEmpty(即缓冲区非空) 和 notFull(缓冲区未满)。
当任务向缓冲区添加一个int时, 如果缓冲区是满的, 那么任务将会等待notFull条件。 当任务区是空的, 那么任务将等待 notEmpty 条件。
import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Buffer { private static final int CAPACITY = 1; private LinkedList<Integer> queue = new LinkedList<>(); //创建lock private static Lock lock = new ReentrantLock(); private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); //向队列中写入数据 public void write(int value){ lock.lock();//获得锁 try { while(queue.size() == CAPACITY){ System.out.println("Wait for notFull condition."); notFull.await(); } queue.offer(value); notEmpty.signal(); //发送 notEmpty condition }catch (InterruptedException e){ e.printStackTrace(); }finally{ lock.unlock(); } } public int read(){ int value = 0; lock.lock(); //获得锁 try{ while(queue.isEmpty()){ System.out.println("Wait for notEmpty condition."); notEmpty.await(); } value = queue.remove(); notFull.signal(); // 发送 notFull condition }catch (InterruptedException e){ e.printStackTrace(); }finally { lock.unlock(); //释放锁 return value; } } }
2.2 ProducerTask 生产线程
public class ProducerTask implements Runnable { public Buffer buffer; public ProducerTask(Buffer buffer){ this.buffer = buffer; } @Override public void run(){ try { int i =1; while(true){ System.out.println("向buffer中增加一个 数 "); buffer.write(i); //线程睡眠 Thread.sleep((int)(Math.random() * 10000)); } }catch (InterruptedException e) { e.printStackTrace(); } } }
2.3 ConsumerTask 消费线程
public class ConsumerTask implements Runnable { public Buffer buffer; public ConsumerTask(Buffer buffer){ this.buffer = buffer; } @Override public void run(){ try{ while(true){ System.out.println("Consumer reads" + buffer.read()); Thread.sleep((int)(Math.random() * 10000)); } }catch (InterruptedException e){ e.printStackTrace(); } } }
运行效果
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { Buffer buffer = new Buffer(); ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new ConsumerTask(buffer)); executor.execute(new ProducerTask(buffer)); executor.shutdown(); } }
Wait for notEmpty condition. 向buffer中增加一个 数 Consumer reads1 向buffer中增加一个 数 Consumer reads1 Wait for notEmpty condition. 向buffer中增加一个 数 Consumer reads1 Wait for notEmpty condition. 向buffer中增加一个 数 Consumer reads1 向buffer中增加一个 数 Consumer reads1 向buffer中增加一个 数 Consumer reads1 Wait for notEmpty condition. 向buffer中增加一个 数 Consumer reads1 Wait for notEmpty condition. 向buffer中增加一个 数 Consumer reads1 Wait for notEmpty condition.
相关文章推荐
- Java线程:并发协作-生产者消费者模型
- Java线程:并发协作-生产者消费者模型
- Java线程:并发协作-生产者消费者模型
- Java线程:并发协作-生产者消费者模型
- java线程-从生产者和消费者模型说起
- Java线程:并发协作-生产者消费者模型
- Java线程:并发协作-生产者消费者模型 转自:http://lavasoft.blog.51cto.com/62575/221932
- 生产者、消费者线程模型 java版
- 生产者--消费者模型(线程锁方式)
- Java线程:并发协作-生产者消费者模型
- Java线程:并发协作-生产者消费者模型
- Java线程:生产者消费者模型
- Java线程:并发协作-生产者消费者模型 转自:http://lavasoft.blog.51cto.com/62575/221932
- PSP??? 生产者与消费者框架,还是线程模型?
- Java线程:并发协作-生产者消费者模型
- 线程同步之经典生产者-消费者模型
- Java线程:并发协作-生产者消费者模型
- 用生产者消费者模型实现的线程安全环形队列
- 消费者&生产者模型的python代码
- 线程同步之经典生产者-消费者模型