Linux组件封装之五:生产者消费者问题
2014-10-08 01:40
295 查看
生产者,消费者问题是有关互斥锁(MutexLock)、条件变量(Condition)、线程(Thread)的经典案例;
描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品。
生产着消费者问题的难点在于:
第一个问题属于互斥问题,我们需要一把互斥锁实现互斥访问(MutexLock), 以确保实现缓冲区的安全访问。
后两个问题则属于同步问题,两类线程相互协作,我们需要两个条件变量,一个用于通知消费者从缓冲区取走产品,另一个通知生产者往缓冲区投放产品。
生产者的大概流程为:
消费者的大概流程为:
为此,我们设计出一个缓冲区类,把互斥锁和条件变量作为其成员变量;
这里注意, 我们把同步与互斥的操作都放入Buffer中,是得生产者和消费者线程不必考虑其中的细节,这符合软件设计的“高内聚,低耦合”原则;
还有一点, mutex被声明为mutable类型,意味着mutex的状态在const函数中仍然可以被改变,是符合程序逻辑的,把mutex声明为mutable,是一种标准实现。
Buffer的具体实现代码如下:
注意:
1、条件变量的等待必须使用While, 这是一种最佳实践,原因可见Condition的封装 Linux组件封装之二:Condition;
2、可以先notify,也可以先解锁,不过推荐先解锁,原因是如果线程A先notify,唤醒一个线程B,但是A还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销。
这里还有一个问题,就是我们在main函数中,必须一个一个的声明生产者,消费者,一个一个的去start、join,那么为了防止这种麻烦,我们可以怎么做呢?
我们可以将缓冲区与多个生产者、消费者封装成一个 车间类。代码如下:
实现如下(注意之处放在cpp中);
这样我们就可以同时指定 buffer的大小,生产者的数目,消费者的数目。
描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品。
生产着消费者问题的难点在于:
为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区投放或者消费产品,这个buffer就是所谓的临界资源。
生产者往缓冲区中投放产品时,如果缓冲区已满,那么该线程需要等待,即进入阻塞状态,一直到消费者取走产品为止。
相应的,消费者欲取走产品,如果此时缓冲区为空,即没有产品,那么消费者则需要等待,一直到有生产者投放产品为止。
第一个问题属于互斥问题,我们需要一把互斥锁实现互斥访问(MutexLock), 以确保实现缓冲区的安全访问。
后两个问题则属于同步问题,两类线程相互协作,我们需要两个条件变量,一个用于通知消费者从缓冲区取走产品,另一个通知生产者往缓冲区投放产品。
生产者的大概流程为:
1、加锁; 2、若缓冲区已满,则进入等待状态;否则执行 3; 3、生产产品; 4、解锁; 5、通知消费者取走产品
消费者的大概流程为:
1、加锁; 2、若缓冲区已空,则进入等待状态;否则执行 3; 3、取走产品; 4、解锁; 5、通知生产者生产产品
为此,我们设计出一个缓冲区类,把互斥锁和条件变量作为其成员变量;
#ifndef BUFFER_H_ #define BUFFER_H_ #include "NonCopyable.h" #include "MutexLock.h" #include "Condition.h" #include <queue> class Buffer:NonCopyable { public: Buffer(size_t size);//attention void push(int val);//投放产品 int pop();//取走产品 bool isEmpty()const; size_t size()const; private: mutable MutexLock mutex_;//注意声明次序,不能改变 Condition full_; Condition empty_; size_t size_;//缓冲区大小 std::queue<int> q_; }; #endif
这里注意, 我们把同步与互斥的操作都放入Buffer中,是得生产者和消费者线程不必考虑其中的细节,这符合软件设计的“高内聚,低耦合”原则;
还有一点, mutex被声明为mutable类型,意味着mutex的状态在const函数中仍然可以被改变,是符合程序逻辑的,把mutex声明为mutable,是一种标准实现。
Buffer的具体实现代码如下:
#include "Buffer.h" #include <iostream> Buffer::Buffer(size_t size) :size_(size), full_(mutex_), //用mutex初始化Condition的一个对象 empty_(mutex_)//用mutex初始化Condition的另一个对象 {} void Buffer::push(int val) { { //attention 作用域问题 MutexGuard lock(mutex_); while(q_.size()>= size_) empty_.wait(); q_.push(val); } full_.signal(); } int Buffer::pop()//attention { int tmp= 0; { MutexGuard lock(mutex_); while(q_.empty()) full_.wait(); tmp = q_.front(); q_.pop(); } empty_.signal(); return tmp; } bool Buffer::isEmpty()const { //after MutexGuard lock(mutex_);//作用域仅限于花括号内 return q_.empty(); } size_t Buffer::size()const { MutexGuard lock(mutex_); return q_.size(); }
注意:
1、条件变量的等待必须使用While, 这是一种最佳实践,原因可见Condition的封装 Linux组件封装之二:Condition;
2、可以先notify,也可以先解锁,不过推荐先解锁,原因是如果线程A先notify,唤醒一个线程B,但是A还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销。
这里还有一个问题,就是我们在main函数中,必须一个一个的声明生产者,消费者,一个一个的去start、join,那么为了防止这种麻烦,我们可以怎么做呢?
我们可以将缓冲区与多个生产者、消费者封装成一个 车间类。代码如下:
#ifndef WORKSHOP_H_ #define WORKSHOP_H_ #include "NonCopyable.h" #include "Buffer.h" #include <vector> class ProducerThread; class ConsumerThread; class Buffer; class WorkShop:NonCopyable { public: WorkShop(size_t bufferSize, size_t producerSize, size_t consumerSize); ~WorkShop(); void startWorking(); private: size_t bufferSize_; Buffer buffer_; size_t producerSize_; size_t consumerSize_; std::vector<ProducerThread*> producers_; std::vector<ConsumerThread*> consumers_; }; #endif
实现如下(注意之处放在cpp中);
#include "WorkShop.h" #include "ProducerThread.h" #include "ConsumerThread.h" //version 1 WorkShop::WorkShop(size_t buffersize, size_t producerSize, size_t consumerSize) :bufferSize_(buffersize), buffer_(bufferSize_), producerSize_(producerSize), consumerSize_(consumerSize), producers_(producerSize_, new ProducerThread(buffer_)), consumers_(consumerSize_, new ConsumerThread(buffer_)) { } //version 2 /* WorkShop::WorkShop(size_t buffersize, size_t producerSize, size_t consumerSize) :bufferSize_(buffersize), buffer_(bufferSize_), producerSize_(producerSize), consumerSize_(consumerSize), producers_(producerSize_,NULL),//vector 的初始化 consumers_(consumerSize_,NULL) { for(std::vector<ProducerThread*>::iterator it = producers_.begin(); it != producers_.end(); ++it) { *it = new ProducerThread(buffer_); } for(std::vector<ConsumerThread*>::iterator it = consumers_.begin(); it != consumers_.end(); ++it) { *it = new ConsumerThread(buffer_); } } */ WorkShop::~WorkShop() { for(std::vector<ProducerThread*>::iterator it = producers_.begin(); it != producers_.end(); ++it) { delete *it ; } for(std::vector<ConsumerThread*>::iterator it = consumers_.begin(); it != consumers_.end(); ++it) { delete *it ; } } void WorkShop::startWorking() { for(std::vector<ProducerThread*>::iterator it = producers_.begin(); it != producers_.end(); ++it) { //注意,此循环不能同时调用start,join->发生阻塞(只能产生一个 ProducerThread) (*it)->start() ; } for(std::vector<ConsumerThread*>::iterator it = consumers_.begin(); it != consumers_.end(); ++it) { (*it)->start() ; } for(std::vector<ProducerThread*>::iterator it = producers_.begin(); it != producers_.end(); ++it) { (*it)->join() ; } for(std::vector<ConsumerThread*>::iterator it = consumers_.begin(); it != consumers_.end(); ++it) { (*it)->join() ; } }
这样我们就可以同时指定 buffer的大小,生产者的数目,消费者的数目。
相关文章推荐
- Linux组件封装(五)一个生产者消费者问题示例
- Linux组件封装(五)一个生产者消费者问题示例
- Linux组件封装(五)一个生产者消费者问题示例
- linux下实现生产者消费者问题
- 并发编程(二):分析Boost对 互斥量和条件变量的封装及实现生产者消费者问题
- linux 多线程生产者和消费者问题
- Linux生产者消费者问题编程实例
- Linux C 实现生产者消费者问题
- 并发编程(二):分析Boost对 互斥量和条件变量的封装及实现生产者消费者问题
- Linux生产者与消费者的问题实现
- linux生产者,消费者问题
- 【转】linux C 解决 生产者消费者问题
- linux下生产者与消费者问题代码,以及编译c代码时error:undefined reference to sem_wait 解决方法之一
- 生产者-消费者问题实现 (linux下C语言)
- linux 下 条件变量实现生产者消费者问题
- Linux下生产者消费者问题详细分析(操作系统期中考试论文---并发程序的同步和互斥)
- Linux下用fork()派生的子进程通过pipe管道通讯的实例详解("生产者-消费者"问题)
- Linux进程间通信与生产者消费者问题
- linux中的线程同步:生产者、消费者问题
- 生产者-消费者问题实现 (linux下C语言)----笛风读书笔记系列