您的位置:首页 > 其它

用信号量解决生产者消费者问题

2013-02-22 16:51 429 查看
问题:有一缓冲区,和若干数量的生产者、消费者。生产者和消费者分别到缓冲区生产、消费数据。要求编程实现生产者和消费者的并发执行。

分析:

生产者不能向已经装载有产品的缓冲区单元添加产品,消费者也不能在空的缓冲区单元取得产品。

首先,定义两个信号量,分别表示当前可取到产品的缓冲区单元数(设为信号量A)、当前空置的缓冲区单元数(设为B)。生产者每生产一个产品,就用信号量A去通知消费者,产品可取;而消费者每消费一个产品,就用信号量B去通知生产者有空置缓冲区。

然后,定义若干关键段(Critical Section),解决生产者群、消费者群内部的同步问题。

最后,我们规定一个操作方向——生产者与消费者都按照这个方向在缓冲区上循环。并且,生产者群与消费者群需要各自维护一个索引信息,指向可操作的缓冲区单元。

如图所示:



在实现中,需要注意的一些问题:

1. 如何结束生产者线程:可简单的让每个生产者只生产一定数量的产品,达到上限即自动结束;

2.当所有生产者退出,且当前没有产品时,消费者如何退出:可使用等待超时;

3. 当前可生产信号量的初始值应为缓冲区大小,最大值也应设置为缓冲区大小;

4. 当前可消费信号量的初始值应设置为0,最大值为缓冲区大小;

5. 可用线程ID来标识各生产者与消费者;

6. 可用线程号与产品序号来标识产品;

代码如下:

/**************
* TestMT.cpp
*************/

#include "stdafx.h"
#include <process.h>
#include <windows.h>

const int MAX_PRODUCER_CAPACITY = 5;	//每个生产者能生产的最大产品数.
const int BUFFER_SIZE = 4;				//缓冲区大小.
const DWORD CONSUMER_TIME_OUT = 10000;	//消费者等待超时时间.

int g_nBuffer[BUFFER_SIZE];				//缓冲区.
CRITICAL_SECTION g_csBuffer;			//对应于缓冲区的关键段.

int g_nProduceLocation;					//当前生产位置.
CRITICAL_SECTION g_csProduceLocation;	//对应于当前生产位置的关键段.

int g_nConsumeLocation;					//当前消费位置.
CRITICAL_SECTION g_csConsumeLocation;	//对应于当前消费位置的关键段.

HANDLE g_hEmptyBufferSemaphore;			//仓库空置信号量.
HANDLE g_hFullBufferSemaphore;			//仓库占用信号量.

unsigned _stdcall ProducerFunc(PVOID pParam)
{
int nCurrentProdNum = 0;	//已生产产品数.
int nTreadId = (int)GetCurrentThreadId();	//线程ID.
int nProdId = 0;	//产品ID.

while(nCurrentProdNum < MAX_PRODUCER_CAPACITY)
{
WaitForSingleObject(g_hEmptyBufferSemaphore, INFINITE);

EnterCriticalSection(&g_csProduceLocation);
EnterCriticalSection(&g_csBuffer);

// 产品ID以"线程ID * 1000 + 产品序号"的方式标识.
nProdId = nTreadId * 1000 + (nCurrentProdNum + 1);

//生产.
g_nBuffer[g_nProduceLocation] = nProdId;
printf("【%d】在位置%d生产数据:%d.\n", nTreadId, g_nProduceLocation + 1, nProdId);
nCurrentProdNum++;
g_nProduceLocation = (g_nProduceLocation + 1) % BUFFER_SIZE;

LeaveCriticalSection(&g_csBuffer);
LeaveCriticalSection(&g_csProduceLocation);

//告知消费者产品可取.
ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL);
}

return 0;
}

unsigned _stdcall ConsumerFunc(PVOID pParam)
{
while(true)
{
//等待可消费信号量.
if(WAIT_TIMEOUT == WaitForSingleObject(g_hFullBufferSemaphore, CONSUMER_TIME_OUT))
{
break;
}

EnterCriticalSection(&g_csConsumeLocation);
EnterCriticalSection(&g_csBuffer);

//消费.
printf("				【%d】在位置%d消费数据:%d. \n",
GetCurrentThreadId(), g_nConsumeLocation + 1, g_nBuffer[g_nConsumeLocation]);

//设置后续消费位置.
g_nConsumeLocation = (g_nConsumeLocation + 1) % BUFFER_SIZE;

LeaveCriticalSection(&g_csBuffer);
LeaveCriticalSection(&g_csConsumeLocation);

//告知生产者可生产.
ReleaseSemaphore(g_hEmptyBufferSemaphore, 1, NULL);
}

return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
const int PRODUCER_NUM = 2;				//生产者数量.
const int CONSUMER_NUM = 3;				//消费者数量.
HANDLE hProducerThreads[PRODUCER_NUM];	//生产者群.
HANDLE hConsumerThreads[CONSUMER_NUM];	//消费者群.

//初始化关键段.
InitializeCriticalSection(&g_csBuffer);
InitializeCriticalSection(&g_csProduceLocation);
InitializeCriticalSection(&g_csConsumeLocation);

//仓库空置信号量,初始值为缓冲区大小.
g_hEmptyBufferSemaphore = CreateSemaphore(NULL, BUFFER_SIZE, BUFFER_SIZE, NULL);
//仓库占用信号量,初始值为0.
g_hFullBufferSemaphore = CreateSemaphore(NULL, 0, BUFFER_SIZE, NULL);

//当前消费位置,初始值为0.
g_nConsumeLocation = 0;
//当前生产位置,初始值为0.
g_nProduceLocation = 0;

//创建消费者.
for(int i=0; i<CONSUMER_NUM; i++)
{
hConsumerThreads[i] = (HANDLE)_beginthreadex(NULL, 0, ConsumerFunc, NULL, 0, NULL);
}
//创建生产者.
for(int i=0; i<PRODUCER_NUM; i++)
{
hProducerThreads[i] = (HANDLE)_beginthreadex(NULL, 0, ProducerFunc, NULL, 0, NULL);
}

//等待生产者和消费者线程结束.
WaitForMultipleObjects(PRODUCER_NUM, hProducerThreads, TRUE, INFINITE);
WaitForMultipleObjects(CONSUMER_NUM, hConsumerThreads, TRUE, INFINITE);

//清理线程.
for(int i=0; i<PRODUCER_NUM; i++)
{
CloseHandle(hProducerThreads[i]);
}
for(int i=0; i<CONSUMER_NUM; i++)
{
CloseHandle(hConsumerThreads[i]);
}

//清理信号量.
CloseHandle(g_hEmptyBufferSemaphore);
CloseHandle(g_hFullBufferSemaphore);

//删除关键段.
DeleteCriticalSection(&g_csBuffer);
DeleteCriticalSection(&g_csProduceLocation);
DeleteCriticalSection(&g_csConsumeLocation);

return 0;
}


运行结果是否符合要求的粗略判定:

1. 连续生产、消费次数均不大于缓冲区大小;

2. 任一产品均为先生产、后消费;

3. 生产的位置顺序是否正确;

4.
消费的位置顺序是否正确;

5. 生产产品的序列、消费产品的序列二者是否一致;

运行结果1:



运行结果2:



=========================End===========================
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: