java并发之生产者消费者模型
2016-01-10 14:21
453 查看
生产者和消费者模型是操作系统中经典的同步问题。该问题最早由Dijkstra提出,用以演示它提出的信号量机制。
经典的生产者和消费者模型的描述是:有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费。为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程将它所生产的产品放入一个缓冲区中;消费者进程可从一个缓冲区中取走产品去消费。尽管所有的生产者进程和消费者进程都是以异步方式运行的,但它们之间必须保持同步,即不允许消费者进程到一个空缓冲区去取产品,也不允许生产者进程向一个已装满产品且尚未被取走的缓冲区投放产品。
![](http://img.blog.csdn.net/20160110134343236)
首先我们复习一下操作系统中同步机制中应遵循的准则:
空闲让进:当无进程处于临界区时,应允许一个请求进入临界区的进程进入临界区;
忙则等待:当已有进程进入临界区时,其他试图进入临界区的进程必须等待;
有限等待:对要求访问临界资源的进程,应保证在有限时间内能进入自己的临界区,以免陷入“死等”状态;
让权等待:当进程不能进入自己的临界区时,应立即释放处理机,以免进程陷入“忙等”;
在生产者和消费者模型中要保证一下几点:
1.生产者在往缓存队列中放产品时,消费者不能取产品。
2.消费者从缓存队列中取产品时,生产者不能放产品。
3.同一时刻只有一个生产者可以往缓存队列中放产品。
4.同一时刻只有一个消费者可以从缓存队列中取产品。
5.缓存队列满时生产者不能往缓存队列中放产品。
6.缓存队列为空时消费者不能从缓存队列中取产品。
本例子中的缓存队列模仿java jdk中的ArrayBlockingQueue,这是一个阻塞队列,缓存池满时会自动将生产者线程挂起,缓存池空时会自动将消费者线程挂起。
缓存池
测试模型
主类
运行结果:
经典的生产者和消费者模型的描述是:有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费。为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程将它所生产的产品放入一个缓冲区中;消费者进程可从一个缓冲区中取走产品去消费。尽管所有的生产者进程和消费者进程都是以异步方式运行的,但它们之间必须保持同步,即不允许消费者进程到一个空缓冲区去取产品,也不允许生产者进程向一个已装满产品且尚未被取走的缓冲区投放产品。
首先我们复习一下操作系统中同步机制中应遵循的准则:
空闲让进:当无进程处于临界区时,应允许一个请求进入临界区的进程进入临界区;
忙则等待:当已有进程进入临界区时,其他试图进入临界区的进程必须等待;
有限等待:对要求访问临界资源的进程,应保证在有限时间内能进入自己的临界区,以免陷入“死等”状态;
让权等待:当进程不能进入自己的临界区时,应立即释放处理机,以免进程陷入“忙等”;
在生产者和消费者模型中要保证一下几点:
1.生产者在往缓存队列中放产品时,消费者不能取产品。
2.消费者从缓存队列中取产品时,生产者不能放产品。
3.同一时刻只有一个生产者可以往缓存队列中放产品。
4.同一时刻只有一个消费者可以从缓存队列中取产品。
5.缓存队列满时生产者不能往缓存队列中放产品。
6.缓存队列为空时消费者不能从缓存队列中取产品。
本例子中的缓存队列模仿java jdk中的ArrayBlockingQueue,这是一个阻塞队列,缓存池满时会自动将生产者线程挂起,缓存池空时会自动将消费者线程挂起。
缓存池
public class Pool<E> { /**队列最长长度*/ private int MaxSize = 1000; /**队列默认长度*/ private static final int defaultSize = 100; /**资源池*/ private Object[] objs ; /**队头*/ private int front; /**队尾*/ private int rear; /**元素的个数*/ private int nItems; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; private int useSize = 0; public Pool() { this(defaultSize); useSize = defaultSize; } public Pool(int size) { if(size < 0) throw new IndexOutOfBoundsException(); size = size > MaxSize ? MaxSize : size; useSize = size; objs = new Object[size]; front = 0; rear = -1; nItems = 0; lock = new ReentrantLock(true); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /**进队*/ private void queue(E e) { if(rear == useSize - 1) rear = -1; objs[++rear] = e; nItems++; notEmpty.signal(); } /**出队*/ private E dequeue() { E e = (E)objs[front++]; if(front == useSize) front = 0; nItems--; notFull.signal(); return e; } /**进队 资源池满会将入队线程挂起*/ public void offer(E e) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { while(nItems == objs.length) notFull.await(); queue(e); System.out.println("学生进队,当前池中有 " + nItems + " 名同学" ); } finally { lock.unlock(); } } /**出队 资源池空会将出队线程挂起*/ public E poll() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { while(nItems == 0) notEmpty.await(); E e = dequeue(); System.out.println("学生出队,当前池中有 " + nItems + " 名同学" ); return e; } finally { lock.unlock(); } } /**是否满*/ public boolean isFull() { final ReentrantLock lock = this.lock; lock.lock(); try { return nItems == MaxSize ? true : false; } finally { lock.unlock(); } } /**判断是否为空*/ public boolean isEmpty() { final ReentrantLock lock = this.lock; lock.lock(); try { return nItems == 0 ? true : false; } finally { lock.unlock(); } } /**返回队列中元素个数*/ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return this.nItems; } finally { lock.unlock(); } } }
测试模型
public class Student { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
主类
public class PM { private Pool<Student> pools = new Pool<Student>(1000); public static void main(String[] args) { PM pm = new PM(); ExecutorService executor = Executors.newFixedThreadPool(6); executor.execute(pm.new consume()); executor.execute(pm.new consume()); executor.execute(pm.new consume()); executor.execute(pm.new produce()); executor.execute(pm.new produce()); executor.execute(pm.new produce()); } class produce implements Runnable { @Override public void run() { while(true) { try { pools.offer(new Student()); } catch (InterruptedException e) { e.printStackTrace(); } } } } class consume implements Runnable { @Override public void run() { while(true) { try { pools.poll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
运行结果:
相关文章推荐
- spring2.5.6的‘annotation-config’ and its parser class are only available on JDK 1.5 and higher错误解决
- java并发库中的LockSupport介绍及使用
- 1、JAVA的简单介绍、入门,开发环境搭建、开发Java程序
- 简化Spring的XML配置(二)——使用注解装配bean
- 解决在Spring整合Hibernate配置tx事务管理器出现错误的问题
- eclipse maven plugin 插件 安装 和 配置
- Struts2漏洞修复到2.3.15.1版本步骤
- java7 try-with-resource
- Spring MVC 4.+ 使用 Ehcache 超简单配置!!!
- 从零开始写javaweb框架笔记10-搭建轻量级JAVAWEB框架-确定目标
- Google的Java编程风格指南(Java编码规范)
- 从零开始写javaweb框架笔记9-细节完善与代码优化-完善控制器层
- Struts 2.1 两天快速入门(第一天上午)转
- Java_Ant详解
- Java_Ant详解
- Spring-webmvc关于json的一些问题
- ibatis+spring+struts 环境配置
- myEclipse 中常用快捷键
- myeclipse如何显示代码行数
- java 中操作日期类型