生产者-消费者模型的Java实现
2016-07-08 20:57
399 查看
生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
一,首先定义公共资源类,其中的变量number是保存的公共数据。并且定义两个方法,增加number的值和减少number的值。由于多线程的原因,必须加上synchronized关键字,注意while判断的条件。
代码:
对于本例,要说明的是当发现不能满足生产或者消费条件的时候,调用对象的wait方法,wait方法的作用是释放当前线程的所获得的锁,并调用对象的notifyAll() 方法,通知(唤醒)该对象上其他等待线程,使得其继续执行。这样,整个生产者、消费者线程得以正确的协作执行。
notifyAll() 方法,起到的是一个通知作用,不释放锁,也不获取锁。只是告诉该对象上等待的线程“可以竞争执行了,都醒来去执行吧”。
Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。从功能上来说wait就是说线程在获取对象锁后,主动释放对象锁,同时本线程休眠。直到有其它线程调用对象的notify()唤醒该线程,才能继续获取对象锁,并继续执行。相应的notify()就是对对象锁的唤醒操作。但有一点需要注意的是notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后,JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。这样就提供了在线程间同步、唤醒的操作。Thread.sleep()与Object.wait()二者都可以暂停当前线程,释放CPU控制权,主要的区别在于Object.wait()在释放CPU同时,释放了对象锁的控制。
本例仅仅是生产者消费者模型中最简单的一种表示,本例中,如果消费者消费的仓储量达不到满足,而又没有生产者,则程序会一直处于等待状态,这当然是不对的。实际上可以将此例进行修改,修改为,根据消费驱动生产,同时生产兼顾仓库,如果仓不满就生产,并对每次最大消费量做个限制,这样就不存在此问题了,当然这样的例子更复杂,更难以说明这样一个简单模型。
又一个简单实例
一,首先定义公共资源类,其中的变量number是保存的公共数据。并且定义两个方法,增加number的值和减少number的值。由于多线程的原因,必须加上synchronized关键字,注意while判断的条件。
/**
* 公共资源类
*/
public class PublicResource {
private int number = 0;
/**
* 增加公共资源
*/
public synchronized void increace() {
while (number != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println(number);
notify();
}
/**
* 减少公共资源
*/
public synchronized void decreace() {
while (number == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(number);
notify();
}
}
二,分别定义生产者线程和消费者线程,并模拟多次生产和消费,即增加和减少公共资源的number值
/**
* 生产者线程,负责生产公共资源
*/
public class ProducerThread implements Runnable {
private PublicResource resource;
public ProducerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.increace();
}
}
}
/**
* 消费者线程,负责消费公共资源
*/
public class ConsumerThread implements Runnable {
private PublicResource resource;
public ConsumerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.decreace();
}
}
}
三,模拟多个生产者和消费者操作公共资源的情形,结果须保证是在允许的范围内。
public class ProducerConsumerTest {
public static void main(String[] args) {
PublicResource resource = new PublicResource();
new Thread(new ProducerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProducerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProducerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
}
}
一,首先定义公共资源类,其中的变量number是保存的公共数据。并且定义两个方法,增加number的值和减少number的值。由于多线程的原因,必须加上synchronized关键字,注意while判断的条件。
代码:
public class Test { public static void main(String[] args) { Godown godown = new Godown(30); Consumer c1 = new Consumer(50, godown); Consumer c2 = new Consumer(20, godown); Consumer c3 = new Consumer(30, godown); Producer p1 = new Producer(10, godown); Producer p2 = new Producer(10, godown); Producer p3 = new Producer(10, godown); Producer p4 = new Producer(10, godown); Producer p5 = new Producer(10, godown); Producer p6 = new Producer(10, godown); Producer p7 = new Producer(80, godown); c1.start(); c2.start(); c3.start(); p1.start(); p2.start(); p3.start(); p4.start(); p5.start(); p6.start(); p7.start(); } } /** * 仓库 */ class Godown { public static final int max_size = 100; //最大库存量 public int curnum; //当前库存量 Godown() { } Godown(int curnum) { this.curnum = curnum; } /** * 生产指定数量的产品 * * @param neednum */ public synchronized void produce(int neednum) { //测试是否需要生产 while (neednum + curnum > max_size) { System.out.println("要生产的产品数量" + neednum + "超过剩余库存量" + (max_size - curnum) + ",暂时不能执行生产任务!"); try { //当前的生产线程等待 wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //满足生产条件,则进行生产,这里简单的更改当前库存量 curnum += neednum; System.out.println("已经生产了" + neednum + "个产品,现仓储量为" + curnum); //唤醒在此对象监视器上等待的所有线程 notifyAll(); } /** * 消费指定数量的产品 * * @param neednum */ public synchronized void consume(int neednum) { //测试是否可消费 while (curnum < neednum) { try { //当前的生产线程等待 wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //满足消费条件,则进行消费,这里简单的更改当前库存量 curnum -= neednum; System.out.println("已经消费了" + neednum + "个产品,现仓储量为" + curnum); //唤醒在此对象监视器上等待的所有线程 notifyAll(); } } /** * 生产者 */ class Producer extends Thread { private int neednum; //生产产品的数量 private Godown godown; //仓库 Producer(int neednum, Godown godown) { this.neednum = neednum; this.godown = godown; } public void run() { //生产指定数量的产品 godown.produce(neednum); } } /** * 消费者 */ class Consumer extends Thread { private int neednum; //生产产品的数量 private Godown godown; //仓库 Consumer(int neednum, Godown godown) { this.neednum = neednum; this.godown = godown; } public void run() { //消费指定数量的产品 godown.consume(neednum); } }
对于本例,要说明的是当发现不能满足生产或者消费条件的时候,调用对象的wait方法,wait方法的作用是释放当前线程的所获得的锁,并调用对象的notifyAll() 方法,通知(唤醒)该对象上其他等待线程,使得其继续执行。这样,整个生产者、消费者线程得以正确的协作执行。
notifyAll() 方法,起到的是一个通知作用,不释放锁,也不获取锁。只是告诉该对象上等待的线程“可以竞争执行了,都醒来去执行吧”。
Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。从功能上来说wait就是说线程在获取对象锁后,主动释放对象锁,同时本线程休眠。直到有其它线程调用对象的notify()唤醒该线程,才能继续获取对象锁,并继续执行。相应的notify()就是对对象锁的唤醒操作。但有一点需要注意的是notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后,JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。这样就提供了在线程间同步、唤醒的操作。Thread.sleep()与Object.wait()二者都可以暂停当前线程,释放CPU控制权,主要的区别在于Object.wait()在释放CPU同时,释放了对象锁的控制。
本例仅仅是生产者消费者模型中最简单的一种表示,本例中,如果消费者消费的仓储量达不到满足,而又没有生产者,则程序会一直处于等待状态,这当然是不对的。实际上可以将此例进行修改,修改为,根据消费驱动生产,同时生产兼顾仓库,如果仓不满就生产,并对每次最大消费量做个限制,这样就不存在此问题了,当然这样的例子更复杂,更难以说明这样一个简单模型。
又一个简单实例
一,首先定义公共资源类,其中的变量number是保存的公共数据。并且定义两个方法,增加number的值和减少number的值。由于多线程的原因,必须加上synchronized关键字,注意while判断的条件。
/**
* 公共资源类
*/
public class PublicResource {
private int number = 0;
/**
* 增加公共资源
*/
public synchronized void increace() {
while (number != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println(number);
notify();
}
/**
* 减少公共资源
*/
public synchronized void decreace() {
while (number == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(number);
notify();
}
}
二,分别定义生产者线程和消费者线程,并模拟多次生产和消费,即增加和减少公共资源的number值
/**
* 生产者线程,负责生产公共资源
*/
public class ProducerThread implements Runnable {
private PublicResource resource;
public ProducerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.increace();
}
}
}
/**
* 消费者线程,负责消费公共资源
*/
public class ConsumerThread implements Runnable {
private PublicResource resource;
public ConsumerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.decreace();
}
}
}
三,模拟多个生产者和消费者操作公共资源的情形,结果须保证是在允许的范围内。
public class ProducerConsumerTest {
public static void main(String[] args) {
PublicResource resource = new PublicResource();
new Thread(new ProducerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProducerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProducerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
}
}
相关文章推荐
- java的ArrayList的值
- 利用数组算阶乘
- Spring之Spring AOP
- Spring-4:bean的自动装配及一些问题
- Java 错误:找不到或无法加载主类
- 选择排序与插入排序--java
- leetcode:Minimum Window Substring题解(Java)
- Java核心技术-读书笔记 概览
- Eclipse配置Hadoop环境并运行WordCount样例
- 如何装饰Eclipse
- java基础教程10:事件处理机制
- Java实现文件远程上传和下载
- Java基础之多线程断点下载
- Java反射学习一
- Java反射学习二
- Java反射学习三
- Java反射学习四
- Spring-3:bean的属性配置细节
- java中volatile关键字的含义
- 记录:git删除误提交的Eclipse项目文件与配置文件