您的位置:首页 > 运维架构 > Linux

Linux组件封装之五:生产者消费者问题

2014-10-08 01:40 295 查看
生产者,消费者问题是有关互斥锁(MutexLock)、条件变量(Condition)、线程(Thread)的经典案例;

描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品。

生产着消费者问题的难点在于:

为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区投放或者消费产品,这个buffer就是所谓的临界资源。


生产者往缓冲区中投放产品时,如果缓冲区已满,那么该线程需要等待,即进入阻塞状态,一直到消费者取走产品为止。


相应的,消费者欲取走产品,如果此时缓冲区为空,即没有产品,那么消费者则需要等待,一直到有生产者投放产品为止。


第一个问题属于互斥问题,我们需要一把互斥锁实现互斥访问(MutexLock), 以确保实现缓冲区的安全访问。
后两个问题则属于同步问题,两类线程相互协作,我们需要两个条件变量,一个用于通知消费者从缓冲区取走产品,另一个通知生产者往缓冲区投放产品。

生产者的大概流程为:

1、加锁;
2、若缓冲区已满,则进入等待状态;否则执行 3;
3、生产产品;
4、解锁;
5、通知消费者取走产品


消费者的大概流程为:

1、加锁;
2、若缓冲区已空,则进入等待状态;否则执行 3;
3、取走产品;
4、解锁;
5、通知生产者生产产品


为此,我们设计出一个缓冲区类,把互斥锁和条件变量作为其成员变量;

#ifndef BUFFER_H_
#define BUFFER_H_

#include "NonCopyable.h"
#include "MutexLock.h"
#include "Condition.h"
#include <queue>

class Buffer:NonCopyable
{
public:
Buffer(size_t size);//attention

void push(int val);//投放产品
int pop();//取走产品

bool isEmpty()const;
size_t size()const;
private:
mutable MutexLock mutex_;//注意声明次序,不能改变
Condition full_;
Condition empty_;

size_t size_;//缓冲区大小
std::queue<int> q_;
};

#endif


这里注意, 我们把同步与互斥的操作都放入Buffer中,是得生产者和消费者线程不必考虑其中的细节,这符合软件设计的“高内聚,低耦合”原则;

还有一点, mutex被声明为mutable类型,意味着mutex的状态在const函数中仍然可以被改变,是符合程序逻辑的,把mutex声明为mutable,是一种标准实现

Buffer的具体实现代码如下:

#include "Buffer.h"
#include <iostream>
Buffer::Buffer(size_t size)
:size_(size),
full_(mutex_), //用mutex初始化Condition的一个对象
empty_(mutex_)//用mutex初始化Condition的另一个对象
{}

void Buffer::push(int val)
{
{ //attention 作用域问题
MutexGuard lock(mutex_);
while(q_.size()>= size_)
empty_.wait();
q_.push(val);
}
full_.signal();
}

int Buffer::pop()//attention
{
int tmp= 0;
{
MutexGuard lock(mutex_);
while(q_.empty())
full_.wait();
tmp = q_.front();
q_.pop();
}
empty_.signal();
return tmp;
}

bool Buffer::isEmpty()const
{
//after
MutexGuard lock(mutex_);//作用域仅限于花括号内
return q_.empty();
}

size_t Buffer::size()const
{
MutexGuard lock(mutex_);
return q_.size();
}


注意:
1、条件变量的等待必须使用While, 这是一种最佳实践,原因可见Condition的封装 Linux组件封装之二:Condition

2、可以先notify,也可以先解锁,不过推荐先解锁,原因是如果线程A先notify,唤醒一个线程B,但是A还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销

这里还有一个问题,就是我们在main函数中,必须一个一个的声明生产者,消费者,一个一个的去start、join,那么为了防止这种麻烦,我们可以怎么做呢?

我们可以将缓冲区与多个生产者、消费者封装成一个 车间类。代码如下:

#ifndef WORKSHOP_H_
#define WORKSHOP_H_

#include "NonCopyable.h"
#include "Buffer.h"
#include <vector>

class ProducerThread;
class ConsumerThread;
class Buffer;
class WorkShop:NonCopyable
{
public:
WorkShop(size_t bufferSize,
size_t producerSize,
size_t consumerSize);

~WorkShop();
void startWorking();

private:
size_t bufferSize_;
Buffer buffer_;

size_t producerSize_;
size_t consumerSize_;
std::vector<ProducerThread*> producers_;
std::vector<ConsumerThread*> consumers_;
};

#endif


实现如下(注意之处放在cpp中);

#include "WorkShop.h"
#include "ProducerThread.h"
#include "ConsumerThread.h"

//version 1
WorkShop::WorkShop(size_t buffersize,
size_t producerSize,
size_t consumerSize)
:bufferSize_(buffersize),
buffer_(bufferSize_),
producerSize_(producerSize),
consumerSize_(consumerSize),
producers_(producerSize_, new ProducerThread(buffer_)),
consumers_(consumerSize_, new ConsumerThread(buffer_))
{

}

//version 2
/*
WorkShop::WorkShop(size_t buffersize,
size_t producerSize,
size_t consumerSize)
:bufferSize_(buffersize),
buffer_(bufferSize_),
producerSize_(producerSize),
consumerSize_(consumerSize),
producers_(producerSize_,NULL),//vector 的初始化
consumers_(consumerSize_,NULL)
{
for(std::vector<ProducerThread*>::iterator it = producers_.begin();
it != producers_.end();
++it)
{
*it = new ProducerThread(buffer_);
}

for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
it != consumers_.end();
++it)
{
*it = new ConsumerThread(buffer_);
}
}
*/

WorkShop::~WorkShop()
{
for(std::vector<ProducerThread*>::iterator it = producers_.begin();
it != producers_.end();
++it)
{
delete *it ;
}

for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
it != consumers_.end();
++it)
{
delete *it ;
}
}

void WorkShop::startWorking()
{
for(std::vector<ProducerThread*>::iterator it = producers_.begin();
it != producers_.end();
++it)
{
//注意,此循环不能同时调用start,join->发生阻塞(只能产生一个 ProducerThread)
(*it)->start() ;
}
for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
it != consumers_.end();
++it)
{
(*it)->start() ;
}

for(std::vector<ProducerThread*>::iterator it = producers_.begin();
it != producers_.end();
++it)
{
(*it)->join() ;
}
for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
it != consumers_.end();
++it)
{
(*it)->join() ;
}
}


这样我们就可以同时指定 buffer的大小,生产者的数目,消费者的数目。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: