Napajs demo-单个生产者/消费者
2017-11-02 08:51
441 查看
生产者/消费者问题
也叫缓存绑定问题(bounded- buffer),是一个经典的、多进程同步问题。单生产者和单消费者
即有两个进程:一组生产者进程和一组消费者进程共享一个初始为空、固定大小为n的缓存(缓冲区)。生产者的工作是制造一段数据,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待,如此反复; 同时,只有缓冲区不空时,消费者才能从中取出消息,一次消费一段数据(即将其从缓存中移出),否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。问题的核心是:
1.要保证不让生产者在缓存还是满的时候仍然要向内写数据;
2.不让消费者试图从空的缓存中取出数据。
生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,他们也是同步关系。
解决思路:
对于生产者,如果缓存是满的就去睡觉。消费者从缓存中取走数据后就叫醒生产者,让它再次将缓存填满。若消费者发现缓存是空的,就去睡觉了。下一轮中生产者将数据写入后就叫醒消费者。
不完善的解决方案会造成“死锁”,即两个进程都在“睡觉”等着对方来“唤醒”。
只有生产者和消费者两个进程,正好是这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步PV操作的位置。使用“进程间通信”,semaphore就可以解决唤醒的问题:
我们使用了两个信号标:full
和 empty 。信号量mutex作为互斥信号量,它用于控制互斥访问缓冲池,互斥信号量初值为 1;信号量 full 用于记录当前缓冲池中“满”缓冲区数,初值为0。信号量 empty 用于记录当前缓冲池中“空”缓冲区数,初值为n。新的数据添加到缓存中后,full 在增加,而 empty 则减少。如果生产者试图在 empty 为0时减少其值,生产者就会被“催眠”。下一轮中有数据被消费掉时,empty就会增加,生产者就会被“唤醒”。
伪代码:
semaphore mutex=1; //临界区互斥信号量 semaphore empty=n; //空闲缓冲区 semaphore full=0; //缓冲区初始化为空 producer ()//生产者进程 { while(1) { produce an item in nextp; //生产数据 P(empty); //获取空缓冲区单元 P(mutex); //进入临界区. add nextp to buffer; //将数据放入缓冲区 V(mutex); //离开临界区,释放互斥信号量 V(full); //满缓冲区数加1 } } consumer ()//消费者进程 { while(1) { P(full); //获取满缓冲区单元 P(mutex); // 进入临界区 remove an item from buffer; //从缓冲区中取出数据 V (mutex); //离开临界区,释放互斥信号量 V (empty) ; //空缓冲区数加1 consume the item; //消费数据 } }
该类问题要注意对缓冲区大小为n的处理,当缓冲区中有空时便可对empty变量执行P 操作,一旦取走一个产品便要执行V操作以释放空闲区。对empty和full变量的P操作必须放在对mutex的P操作之前。
以上内容来自简书
作者:穹蓝奥义
链接:http://www.jianshu.com/p/b16296e9ac85
來源:简书
JavaScript实现单生产者和单消费者问题
使用napajs的来实现单生产者和单消费者问题先使用npm安装napajs
npm install napajs
代码:(提醒:在定义需要broadcast和execute的方法时,使用function的定义方式)
let napa = require('napajs'); /** * producer 和 consumer 两个zone(也就是两个进程) */ let producer = napa.zone.create('producer', { workers: 1 }); let consumer = napa.zone.create('consumer', { workers: 1 }); let store = napa.store.create('store1'); store.set('mutex', true); store.set('empty', 2); store.set('full', 0); store.set('list', ''); function sleep(time) { return new Promise((resolve) => setTimeout(resolve, time)); } /** * 生产 */ function produce() { while (1) { while (store.get('empty') === 0) { sleep(1000); } store.set('empty', store.get('empty') - 1); while (!store.get('mutex')) { sleep(1000); } store.set('mutex', false); console.log('produce a product'); let list = ''; list = store.get('list') + '1'; store.set('list', list); console.log('add to buffer'); store.set('mutex', true); store.set('full', store.get('full') + 1); } } /** * 消费 */ function consume() { while (1) { while (store.get('full') === 0) { sleep(1000); } store.set('full', store.get('full') - 1); while (!store.get('mutex')) { sleep(1000); } store.set('mutex', false); let list = store.get('list'); list = list.substr(1); store.set('list', list); console.log('get a product from buffer'); store.set('mutex', true); store.set('empty', store.get('empty') + 1); console.log('consume a product'); } } let mainProcess = () => { let code = 'let napa = require("napajs");\n' + ' let store = napa.store.get(\'store1\');'; producer.broadcast(code) .then(() => { console.log('producer broadcast success'); }).catch(error => { console.log(error); }); consumer.broadcast(code) .then(() => { console.log('consumer broadcast success'); }).catch(error => { console.log(error); }); producer.broadcast(sleep.toString()); consumer.broadcast(sleep.toString()); producer.broadcast(produce.toString()); consumer.broadcast(consume.toString()); producer.execute('', 'produce') .then((result) => { console.log('producer execute,' + result.value); }) .catch((error) => { console.log(error); }); consumer.execute('', 'consume') .then((result) => { console.log('consumer execute,' + result.value); }) .catch((error) => { console.log(error); }); }; mainProcess();
输出
"F:\WebStorm\WebStorm 2017.2.2\bin\runnerw.exe" F:\nodejs\node.exe F:\WebStorm\practices\jsDoc-demo\producerConsumer.js produce a product add to buffer produce a product add to buffer get a product from buffer consume a product produce a product get a product from buffer add to buffer consume a product produce a product add to buffer get a product from buffer consume a product produce a product add to buffer get a product from buffer consume a product produce a product add to buffer get a product from buffer consume a product produce a product get a product from buffer add to buffer ....
相关文章推荐
- Napajs demo-多个生产者/消费者
- Poco::Thread 生产者消费者Demo
- kafka-3python生产者和消费者实用demo
- 生产者消费者模型(单个生产者和单个消费者)
- 多线程学习Demo注解(3)——生产者和消费者
- 在Windows环境中安装并使用kafka以及生产者消费者Demo
- 单个生产者和单个消费者模型(spsc)
- Java 写一个生产者和消费者的多线程Demo
- Disruptor多个消费者独立处理生产者消息的简单demo
- kafka java 生产者消费者demo
- 生产者消费者 多线程 单个缓冲区 Win32API实现
- RabbitMQ消息队列之二:消费者和生产者 Demo
- Rocketmq生产者和push消费者demo
- 生产者与消费者--demo1---bai
- 关于js中的单线程和异步事件同操作系统的生产者消费者模型的理解
- Java 多线程 (PART XVI)生产者消费者(I) 单个生产者单个消费者
- Go语言模拟一个生产者消费者的Demo
- 生产者与消费者---demo2---boke
- 图文并茂的生产者消费者应用实例demo
- activeMQ的创建生产者和消费者的demo(队列模式)