生产者消费者模式(1)
2015-07-18 21:26
369 查看
生产者消费者问题(Producer-consumer problem)是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
什么是生产者消费者模式
如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
所以说,这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
生产者消费者模式的好处
一种实用的设计模式,常用于编写多线程或并发代码。下面是它的一些优点:
1)简化开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁。
2)生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样
3)生产者和消费者可以以不同的速度执行
4)分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码
从硬件设计角度来看,其实就是硬件设计的FIFO缓冲。
来看个实际应用:
游戏开发中,每一个客户端连接就相当于一个独立的线程,每个客户端的操作相对于其它客户端都是异步的。比如增加英雄的经验,在逻辑处理中可能会有很多判断,但是不管怎么样,到最后都是要更新数据库的,如果所有的客户端都同时更新数据库,人数多的话,数据连接池很可能被占用完,这样导致一些更新就获取不了连接,就没办法更新数据。而生产者和消费考模式很好的解决了这个问题。逻辑处理相当于生产者,把要更新的结果数据放入到队列中,更新数据线程从队列中出并执行更新。而且我也可以每一个data加一个标识符,把游戏服务器中的所有需要更新的操作都放入到这个队列中,这时候相当于有多个生产者,一个消费者线程默默更新,也可以多个消费者线程一起更新。
下面是使用阻塞队列实现生产者消费者模式的例子:
生存着消费者模式提供了一个很好的思维模式,灵活运用它,可以得到很多变化,比如消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理。如下图:
这里,生产者1将消息存放在阻塞队列1里,消费者1从队列里读消息,然后通过消息ID进行hash得到N个队列中的一个,然后根据编号将消息存放在到不同的队列里,每个阻塞队列会分配一个线程来消费阻塞队列里的数据。如果消费者2无法消费消息,就将消息再抛回到阻塞队列1中,交给其他消费者处理。
其实这里消费者1可以看成一个管理者和中转者,负责把消息分发给消费者2、3、4。
以下是代码;
参考:
https://zh.wikipedia.org/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E9%97%AE%E9%A2%98
http://www.youxijishu.com/blogs/9.html
http://www.infoq.com/cn/articles/producers-and-consumers-mode
/article/3823666.html
https://software.intel.com/zh-cn/blogs/2014/02/28/java
http://www.uml.org.cn/zjjs/200904161.asp
什么是生产者消费者模式
如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
所以说,这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
生产者消费者模式的好处
一种实用的设计模式,常用于编写多线程或并发代码。下面是它的一些优点:
1)简化开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁。
2)生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样
3)生产者和消费者可以以不同的速度执行
4)分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码
从硬件设计角度来看,其实就是硬件设计的FIFO缓冲。
来看个实际应用:
游戏开发中,每一个客户端连接就相当于一个独立的线程,每个客户端的操作相对于其它客户端都是异步的。比如增加英雄的经验,在逻辑处理中可能会有很多判断,但是不管怎么样,到最后都是要更新数据库的,如果所有的客户端都同时更新数据库,人数多的话,数据连接池很可能被占用完,这样导致一些更新就获取不了连接,就没办法更新数据。而生产者和消费考模式很好的解决了这个问题。逻辑处理相当于生产者,把要更新的结果数据放入到队列中,更新数据线程从队列中出并执行更新。而且我也可以每一个data加一个标识符,把游戏服务器中的所有需要更新的操作都放入到这个队列中,这时候相当于有多个生产者,一个消费者线程默默更新,也可以多个消费者线程一起更新。
下面是使用阻塞队列实现生产者消费者模式的例子:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class ProducerConsumerPattern { public static void main(String args[]){ //Creating shared object BlockingQueue sharedQueue = new LinkedBlockingQueue(); //Creating Producer and Consumer Thread Thread prodThread = new Thread(new Producer(sharedQueue)); Thread consThread = new Thread(new Consumer(sharedQueue)); //Starting producer and Consumer thread prodThread.start(); consThread.start(); } } //Producer Class in java class Producer implements Runnable { private final BlockingQueue sharedQueue; public Producer(BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=0; i<10; i++){ try { System.out.println("Produced: " + i); sharedQueue.put(i); } catch (InterruptedException ex) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } } } //Consumer Class in Java class Consumer implements Runnable{ private final BlockingQueue sharedQueue; public Consumer (BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { while(true){ try { System.out.println("Consumed: "+ sharedQueue.take()); } catch (InterruptedException ex) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); } } } } Output: Produced: 0 Produced: 1 Consumed: 0 Produced: 2 Consumed: 1 Produced: 3 Consumed: 2 Produced: 4 Consumed: 3 Produced: 5 Consumed: 4 Produced: 6 Consumed: 5 Produced: 7 Consumed: 6 Produced: 8 Consumed: 7 Produced: 9 Consumed: 8 Consumed: 9
生存着消费者模式提供了一个很好的思维模式,灵活运用它,可以得到很多变化,比如消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理。如下图:
这里,生产者1将消息存放在阻塞队列1里,消费者1从队列里读消息,然后通过消息ID进行hash得到N个队列中的一个,然后根据编号将消息存放在到不同的队列里,每个阻塞队列会分配一个线程来消费阻塞队列里的数据。如果消费者2无法消费消息,就将消息再抛回到阻塞队列1中,交给其他消费者处理。
其实这里消费者1可以看成一个管理者和中转者,负责把消息分发给消费者2、3、4。
以下是代码;
public class MsgQueueManager implements IMsgQueue{ private static final Logger LOGGER = LoggerFactory.getLogger(MsgQueueManager.class); //消息总队列 public final BlockingQueue<Message> messageQueue; private MsgQueueManager() { messageQueue = new LinkedTransferQueue<Message>(); } public void put(Message msg) { try { messageQueue.put(msg); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public Message take() { try { return messageQueue.take(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return null; } } //分发消息,负责把消息从大队列塞到小队列里 static class DispatchMessageTask implements Runnable { @Override public void run() { BlockingQueue<Message> subQueue; for (;;) { //如果没有数据,则阻塞在这里 Message msg = MsgQueueFactory.getMessageQueue().take(); //如果为空,则表示没有Session机器连接上来, 需要等待,直到有Session机器连接上来 while ((subQueue = getInstance().getSubQueue()) == null) { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } //把消息放到小队列里 try { subQueue.put(msg); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } //使用Hash算法均衡获取一个子队列。 public BlockingQueue<Message> getSubQueue() { int errorCount = 0; for (;;) { if (subMsgQueues.isEmpty()) { return null; } int index = (int) (System.nanoTime() % subMsgQueues.size()); try { return subMsgQueues.get(index); } catch (Exception e) { //出现错误表示,在获取队列大小之后,队列进行了一次删除操作 LOGGER.error("获取子队列出现错误", e); if ((++errorCount) < 3) { continue; } } } } //使用的时候我们只需要往总队列里发消息,往消息队列里添加一条消息 IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue(); Packet msg = Packet.createPacket(Packet64FrameType.TYPE_DATA, "{}".getBytes(), (short) 1); messageQueue.put(msg);
参考:
https://zh.wikipedia.org/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E9%97%AE%E9%A2%98
http://www.youxijishu.com/blogs/9.html
http://www.infoq.com/cn/articles/producers-and-consumers-mode
/article/3823666.html
https://software.intel.com/zh-cn/blogs/2014/02/28/java
http://www.uml.org.cn/zjjs/200904161.asp
相关文章推荐
- 【Spark Core】TaskScheduler源码与任务提交原理浅析2
- 10行Java代码实现最近被使用(LRU)缓存
- OpenGL_ES 创建一个平面 绘制纹理 响应输入事件
- POJ1088滑雪(入门级DP)
- bzoj1257[CQOI]余数之和
- ZOJ 2104 Let the Balloon Rise
- poj3984-迷宫问题(广搜)
- Android学习笔记之dispatchTouchEvent和OnInterceptTouchEvent和OnTouchEvent三个方法之间的联系...
- Eddy's research I
- 【Spark Core】TaskScheduler源码与任务提交原理浅析1
- 每个程序员都需要了解的一个SQL技巧
- SWIFT中获取配置文件路径的方法
- 迭代器模式
- git使用的基础及常用命令
- ACdream 1727 Sequence
- 添加view类图中的二级菜单
- IntelliJ公司版破解方法
- LeetCode-Add and Search Word - Data structure design(C++)
- Java HashMap 的实现原理详解
- odoo联调补充