您的位置:首页 > 其它

生产者&消费者模型-线程间协调

2017-02-16 11:36 295 查看

本文目录

本文目录

开篇明志

Lock-Condition协作

生产者消费者模型队列模式
1 Buffer缓冲区

2 ProducerTask 生产线程

3 ConsumerTask 消费线程

运行效果

0. 开篇明志

通过生产者、消费者模型, 演示线程的协调。

1. Lock-Condition协作

给小汽车凃蜡, 在凃蜡之前不能抛光, 意思就是: 凃蜡、抛光是按顺序执行的, 只能先凃蜡再抛光, 再继续循环上面的这个过程。

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;

class Car{
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;

public void waxed(){
lock.lock();
try{
waxOn = true;//已经凃蜡
condition.signalAll();
}finally{
lock.unlock();
}
}

public void buffed(){
lock.lock();
try{
waxOn = false;// 抛光完成
condition.signalAll();
}finally{
lock.unlock();
}
}

//等待凃蜡
public void waitForWaxing() throws InterruptedException{
lock.lock();
try{
while(waxOn == false)//如果没有凃蜡 则等待
condition.await();
}finally{
lock.unlock();
}
}
//等待抛光
public void waitForBuffing() throws InterruptedException{
lock.lock();
try{
while(waxOn == true)
condition.await();
}finally{
lock.unlock();
}
}

}

class WaxOn implements Runnable{
private Car car;
public WaxOn(Car car){
this.car = car;
}
public void run(){
try{
while(!Thread.interrupted()){
System.out.println("Wax on!");
Thread.sleep(200);
car.waxed();
car.waitForBuffing();
}
}catch(InterruptedException e){
System.out.println("WaxOn Interrupted!");
}
}
}

class WaxOff implements Runnable{
private Car car;
public WaxOff(Car car){
this.car = car;
}

public void run(){
try{
while(!Thread.interrupted()){
car.waitForWaxing();
System.out.println("WaxOff! ");
Thread.sleep(200);
car.buffed();
}
}catch(InterruptedException e){
System.out.println("WaxOff Interrupted!");
}
}
}

public class Wax {
public static void main(String[] args){
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOn(car));
exec.execute(new WaxOff(car));
try{
Thread.sleep(3000);
}catch(InterruptedException e){

}
exec.shutdownNow();
}
}


输出结果:

Wax on!
WaxOff!
Wax on!
WaxOff!
Wax on!
WaxOff!
Wax on!
WaxOff!
Wax on!
WaxOff!
Wax on!
WaxOff!
Wax on!
WaxOff!
Wax on!
WaxOn Interrupted!
WaxOff Interrupted!


2. 生产者消费者模型队列模式

2.1 Buffer缓冲区

假设使用缓冲区存储整数。缓存区的大小是受限的。缓存区提供write(int)方法将一个int值添加到缓冲区中, 还提供放大read()从缓冲区中读取和删除一个Int值。

为了同步这个操作, 使用具有两个条件的锁: notEmpty(即缓冲区非空) 和 notFull(缓冲区未满)。

当任务向缓冲区添加一个int时, 如果缓冲区是满的, 那么任务将会等待notFull条件。 当任务区是空的, 那么任务将等待 notEmpty 条件。



import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {
private static final int CAPACITY = 1;

private LinkedList<Integer> queue = new LinkedList<>();
//创建lock
private static Lock lock = new ReentrantLock();
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();
//向队列中写入数据
public void write(int value){
lock.lock();//获得锁
try {
while(queue.size() == CAPACITY){
System.out.println("Wait for notFull condition.");
notFull.await();
}
queue.offer(value);
notEmpty.signal(); //发送  notEmpty condition
}catch (InterruptedException e){
e.printStackTrace();
}finally{
lock.unlock();
}
}

public int read(){
int value = 0;
lock.lock(); //获得锁
try{
while(queue.isEmpty()){
System.out.println("Wait for notEmpty condition.");
notEmpty.await();
}
value = queue.remove();
notFull.signal(); // 发送 notFull condition
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock(); //释放锁
return value;
}

}

}


2.2 ProducerTask 生产线程

public class ProducerTask implements Runnable {
public Buffer buffer;
public ProducerTask(Buffer buffer){
this.buffer = buffer;
}
@Override
public void run(){
try {
int i =1;
while(true){
System.out.println("向buffer中增加一个 数 ");
buffer.write(i);
//线程睡眠
Thread.sleep((int)(Math.random() * 10000));
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}


2.3 ConsumerTask 消费线程

public class ConsumerTask implements Runnable {
public Buffer buffer;
public ConsumerTask(Buffer buffer){
this.buffer = buffer;
}
@Override
public void run(){
try{
while(true){
System.out.println("Consumer reads" + buffer.read());
Thread.sleep((int)(Math.random() * 10000));
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}


运行效果

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
Buffer buffer = new Buffer();

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ConsumerTask(buffer));
executor.execute(new ProducerTask(buffer));
executor.shutdown();
}
}


Wait for notEmpty condition.
向buffer中增加一个 数
Consumer reads1
向buffer中增加一个 数
Consumer reads1
Wait for notEmpty condition.
向buffer中增加一个 数
Consumer reads1
Wait for notEmpty condition.
向buffer中增加一个 数
Consumer reads1
向buffer中增加一个 数
Consumer reads1
向buffer中增加一个 数
Consumer reads1
Wait for notEmpty condition.
向buffer中增加一个 数
Consumer reads1
Wait for notEmpty condition.
向buffer中增加一个 数
Consumer reads1
Wait for notEmpty condition.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: