用信号量解决生产者消费者问题
2013-02-22 16:51
429 查看
问题:有一缓冲区,和若干数量的生产者、消费者。生产者和消费者分别到缓冲区生产、消费数据。要求编程实现生产者和消费者的并发执行。
分析:
生产者不能向已经装载有产品的缓冲区单元添加产品,消费者也不能在空的缓冲区单元取得产品。
首先,定义两个信号量,分别表示当前可取到产品的缓冲区单元数(设为信号量A)、当前空置的缓冲区单元数(设为B)。生产者每生产一个产品,就用信号量A去通知消费者,产品可取;而消费者每消费一个产品,就用信号量B去通知生产者有空置缓冲区。
然后,定义若干关键段(Critical Section),解决生产者群、消费者群内部的同步问题。
最后,我们规定一个操作方向——生产者与消费者都按照这个方向在缓冲区上循环。并且,生产者群与消费者群需要各自维护一个索引信息,指向可操作的缓冲区单元。
如图所示:
在实现中,需要注意的一些问题:
1. 如何结束生产者线程:可简单的让每个生产者只生产一定数量的产品,达到上限即自动结束;
2.当所有生产者退出,且当前没有产品时,消费者如何退出:可使用等待超时;
3. 当前可生产信号量的初始值应为缓冲区大小,最大值也应设置为缓冲区大小;
4. 当前可消费信号量的初始值应设置为0,最大值为缓冲区大小;
5. 可用线程ID来标识各生产者与消费者;
6. 可用线程号与产品序号来标识产品;
代码如下:
运行结果是否符合要求的粗略判定:
1. 连续生产、消费次数均不大于缓冲区大小;
2. 任一产品均为先生产、后消费;
3. 生产的位置顺序是否正确;
4.
消费的位置顺序是否正确;
5. 生产产品的序列、消费产品的序列二者是否一致;
运行结果1:
运行结果2:
=========================End===========================
分析:
生产者不能向已经装载有产品的缓冲区单元添加产品,消费者也不能在空的缓冲区单元取得产品。
首先,定义两个信号量,分别表示当前可取到产品的缓冲区单元数(设为信号量A)、当前空置的缓冲区单元数(设为B)。生产者每生产一个产品,就用信号量A去通知消费者,产品可取;而消费者每消费一个产品,就用信号量B去通知生产者有空置缓冲区。
然后,定义若干关键段(Critical Section),解决生产者群、消费者群内部的同步问题。
最后,我们规定一个操作方向——生产者与消费者都按照这个方向在缓冲区上循环。并且,生产者群与消费者群需要各自维护一个索引信息,指向可操作的缓冲区单元。
如图所示:
在实现中,需要注意的一些问题:
1. 如何结束生产者线程:可简单的让每个生产者只生产一定数量的产品,达到上限即自动结束;
2.当所有生产者退出,且当前没有产品时,消费者如何退出:可使用等待超时;
3. 当前可生产信号量的初始值应为缓冲区大小,最大值也应设置为缓冲区大小;
4. 当前可消费信号量的初始值应设置为0,最大值为缓冲区大小;
5. 可用线程ID来标识各生产者与消费者;
6. 可用线程号与产品序号来标识产品;
代码如下:
/************** * TestMT.cpp *************/ #include "stdafx.h" #include <process.h> #include <windows.h> const int MAX_PRODUCER_CAPACITY = 5; //每个生产者能生产的最大产品数. const int BUFFER_SIZE = 4; //缓冲区大小. const DWORD CONSUMER_TIME_OUT = 10000; //消费者等待超时时间. int g_nBuffer[BUFFER_SIZE]; //缓冲区. CRITICAL_SECTION g_csBuffer; //对应于缓冲区的关键段. int g_nProduceLocation; //当前生产位置. CRITICAL_SECTION g_csProduceLocation; //对应于当前生产位置的关键段. int g_nConsumeLocation; //当前消费位置. CRITICAL_SECTION g_csConsumeLocation; //对应于当前消费位置的关键段. HANDLE g_hEmptyBufferSemaphore; //仓库空置信号量. HANDLE g_hFullBufferSemaphore; //仓库占用信号量. unsigned _stdcall ProducerFunc(PVOID pParam) { int nCurrentProdNum = 0; //已生产产品数. int nTreadId = (int)GetCurrentThreadId(); //线程ID. int nProdId = 0; //产品ID. while(nCurrentProdNum < MAX_PRODUCER_CAPACITY) { WaitForSingleObject(g_hEmptyBufferSemaphore, INFINITE); EnterCriticalSection(&g_csProduceLocation); EnterCriticalSection(&g_csBuffer); // 产品ID以"线程ID * 1000 + 产品序号"的方式标识. nProdId = nTreadId * 1000 + (nCurrentProdNum + 1); //生产. g_nBuffer[g_nProduceLocation] = nProdId; printf("【%d】在位置%d生产数据:%d.\n", nTreadId, g_nProduceLocation + 1, nProdId); nCurrentProdNum++; g_nProduceLocation = (g_nProduceLocation + 1) % BUFFER_SIZE; LeaveCriticalSection(&g_csBuffer); LeaveCriticalSection(&g_csProduceLocation); //告知消费者产品可取. ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL); } return 0; } unsigned _stdcall ConsumerFunc(PVOID pParam) { while(true) { //等待可消费信号量. if(WAIT_TIMEOUT == WaitForSingleObject(g_hFullBufferSemaphore, CONSUMER_TIME_OUT)) { break; } EnterCriticalSection(&g_csConsumeLocation); EnterCriticalSection(&g_csBuffer); //消费. printf(" 【%d】在位置%d消费数据:%d. \n", GetCurrentThreadId(), g_nConsumeLocation + 1, g_nBuffer[g_nConsumeLocation]); //设置后续消费位置. g_nConsumeLocation = (g_nConsumeLocation + 1) % BUFFER_SIZE; LeaveCriticalSection(&g_csBuffer); LeaveCriticalSection(&g_csConsumeLocation); //告知生产者可生产. ReleaseSemaphore(g_hEmptyBufferSemaphore, 1, NULL); } return 0; } int _tmain(int argc, _TCHAR* argv[]) { const int PRODUCER_NUM = 2; //生产者数量. const int CONSUMER_NUM = 3; //消费者数量. HANDLE hProducerThreads[PRODUCER_NUM]; //生产者群. HANDLE hConsumerThreads[CONSUMER_NUM]; //消费者群. //初始化关键段. InitializeCriticalSection(&g_csBuffer); InitializeCriticalSection(&g_csProduceLocation); InitializeCriticalSection(&g_csConsumeLocation); //仓库空置信号量,初始值为缓冲区大小. g_hEmptyBufferSemaphore = CreateSemaphore(NULL, BUFFER_SIZE, BUFFER_SIZE, NULL); //仓库占用信号量,初始值为0. g_hFullBufferSemaphore = CreateSemaphore(NULL, 0, BUFFER_SIZE, NULL); //当前消费位置,初始值为0. g_nConsumeLocation = 0; //当前生产位置,初始值为0. g_nProduceLocation = 0; //创建消费者. for(int i=0; i<CONSUMER_NUM; i++) { hConsumerThreads[i] = (HANDLE)_beginthreadex(NULL, 0, ConsumerFunc, NULL, 0, NULL); } //创建生产者. for(int i=0; i<PRODUCER_NUM; i++) { hProducerThreads[i] = (HANDLE)_beginthreadex(NULL, 0, ProducerFunc, NULL, 0, NULL); } //等待生产者和消费者线程结束. WaitForMultipleObjects(PRODUCER_NUM, hProducerThreads, TRUE, INFINITE); WaitForMultipleObjects(CONSUMER_NUM, hConsumerThreads, TRUE, INFINITE); //清理线程. for(int i=0; i<PRODUCER_NUM; i++) { CloseHandle(hProducerThreads[i]); } for(int i=0; i<CONSUMER_NUM; i++) { CloseHandle(hConsumerThreads[i]); } //清理信号量. CloseHandle(g_hEmptyBufferSemaphore); CloseHandle(g_hFullBufferSemaphore); //删除关键段. DeleteCriticalSection(&g_csBuffer); DeleteCriticalSection(&g_csProduceLocation); DeleteCriticalSection(&g_csConsumeLocation); return 0; }
运行结果是否符合要求的粗略判定:
1. 连续生产、消费次数均不大于缓冲区大小;
2. 任一产品均为先生产、后消费;
3. 生产的位置顺序是否正确;
4.
消费的位置顺序是否正确;
5. 生产产品的序列、消费产品的序列二者是否一致;
运行结果1:
运行结果2:
=========================End===========================
相关文章推荐
- 用信号量解决生产者消费者问题
- 用信号量做进程同步解决生产者和消费者遇到的奇怪问题
- Operating System-进程/线程内部通信-信号量、PV操作的实现和应用(解决哲学家进餐和生产者消费者问题)
- System V共享内存与信号量综合应用之生产者与消费者问题解决
- 信号量解决生产者-消费者问题
- Linux下用环形buf以及POSIX版本信号量解决生产者消费者问题
- 1,使用信号量解决生产者-消费者问题
- 用条件变量和信号量解决生产者和消费者问题
- Linux多线程实践(5) --Posix信号量与互斥量(解决生产者消费者问题)
- Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题
- 通过信号量机制解决生产者-消费者问题的模拟程序
- 【坑】记录型信号量/AND信号量/管程解决生产者-消费者问题
- Operating System-进程/线程内部通信-信号量、PV操作的实现和应用(解决哲学家进餐和生产者消费者问题)
- 信号量解决生产者,消费者问题
- Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题
- java信号量PV操作 解决生产者-消费者问题
- Linux多线程实践(五 )Posix信号量和互斥锁解决生产者消费者问题
- 信号量解决生产者-消费者问题
- 记录型信号量解决消费者-生产者问题
- Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题