您的位置:首页 > 其它

操作系统 生产者消费者问题

2014-11-22 20:23 183 查看
多线程并发  信号量同步 设定每个生产者生产的数量 每个消费者消费的数量

还有一些地方觉得有问题

/*
输入要求 numb nump numc 分别为缓冲区数量 生产者数量 消费者数量
nump个数 表示每个生产者需要生产的产品数
numc个数 表示每个消费者需要消费的产品数

4 2 2
3 3
3 3
*/

#include<windows.h>
#include<iostream>
#include<stdio.h>
#include<string>
#include<conio.h>

//定义一些常量;
//本程序允许的最大临界区数;
#define MAX_BUFFER_NUM 84
//秒到毫秒的乘法因子;
#define INTE_PER_SEC 1000
//本程序允许的生产和消费线程的总数;
#define MAX_THREAD_NUM 84
using namespace std;
//定义一个结构,记录在测试文件中指定的每一个线程的参数
struct ThreadInfo
{
int serial; //线程序列号
bool entity; //是P还是C
double delay; //线程延迟
int num; //消费或生产的数量
};

//全局变量的定义

CRITICAL_SECTION PC_Critical[MAX_BUFFER_NUM]; //用于管理缓冲区的互斥访问
int Buffer_Critical[MAX_BUFFER_NUM]; //缓冲区声明,用于存放产品
HANDLE h_Thread[MAX_THREAD_NUM]; //用于存储每个线程句柄的数组
ThreadInfo Thread_Info[MAX_THREAD_NUM]; //线程信息数组
HANDLE empty_semaphore; //一个信号量 缓冲区是否有空间
HANDLE h_mutex, h_mutex2; //一个互斥量
DWORD n_Buffer_or_Critical; //实际的缓冲区或者临界区的数目
HANDLE h_Semaphore[MAX_BUFFER_NUM]; //消费者开始消费的信号量

//生产消费及辅助函数的声明
void Produce(void *p);
void Consume(void *p);
bool IfInOtherRequest(int);
int FindProducePositon();
int FindBufferPosition(int);
void init();

int main() {
//声明所需变量;
//freopen("out.txt", "w", stdout);
DWORD wait_for_all;
int numB, numP, numC;

//初始化
scanf("%d%d%d", &numB, &numP, &numC);
n_Buffer_or_Critical = numB;
init();

for(int i = 0; i < numP; ++ i) {
int x;
scanf("%d", &x);
Thread_Info[i].delay = 1;
Thread_Info[i].entity = 1;
Thread_Info[i].num = x;
Thread_Info[i].serial = i+1;

}
for(int i = 0; i < numC; ++ i) {
int x;
scanf("%d", &x);
Thread_Info[i+numP].delay = 1;
Thread_Info[i+numP].entity = 0;
Thread_Info[i+numP].num = x;
Thread_Info[i+numP].serial = i + numP + 1;
}

//产生缓冲区的信号量
for(int j = 0; j < numB; ++ j) {
string lp = "semaphore_";
int temp =j;
while(temp) {
char c = (char)(temp%10);
lp += c;
temp /= 10;
}
h_Semaphore[j] = CreateSemaphore(NULL, 1, 1, lp.c_str()); //空 初始量 最大量 名字
}

//创建线程
for(int i = 0; i < numP+numC; ++ i) {
if(Thread_Info[i].entity)
h_Thread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)(Produce),
&(Thread_Info[i]), 0, NULL);
else
h_Thread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)(Consume),
&(Thread_Info[i]), 0, NULL);
}

//主程序等待各个线程的动作结束;
wait_for_all = WaitForMultipleObjects(numP+numC, h_Thread, TRUE, -1);
puts("OVER");
return 0;
}

//初始化
void init() {
//初始化缓冲区
for(int i = 0; i < MAX_BUFFER_NUM; ++ i)
Buffer_Critical[i] = -1;
//初始化临界区
for(int i = 0; i < MAX_BUFFER_NUM; ++ i)
InitializeCriticalSection(&PC_Critical[i]);
//创建信号量
empty_semaphore = CreateSemaphore(NULL, n_Buffer_or_Critical, n_Buffer_or_Critical,
"semaphore_for_empty");
//创建互斥对象
h_mutex = CreateMutex(NULL, FALSE, "mutex_for_update");
h_mutex2 = CreateMutex(NULL, FALSE, "mutex_for_update2");
}

//找出当前可以进行产品生产的空缓冲区位置
int FindProducePosition() {
int EmptyPosition;
for (int i = 0; i < (int)n_Buffer_or_Critical; ++ i)
if(Buffer_Critical[i] == -1) {
EmptyPosition = i;
Buffer_Critical[i] = -2; //缓冲区标记为-2 表示正在使用中
return i;
}
return -1;
}

//找出当前所需生产者生产的产品的位置
int FindBufferPosition() {
for(int i = 0 ;i < (int)n_Buffer_or_Critical; ++ i)
if(Buffer_Critical[i] >= 0) {
Buffer_Critical[i] = -2; //缓冲区标记为-2 表示正在使用中
return i;
}
return -1;
}

//生产者进程
void Produce(void *p) {
//局部变量声明;
DWORD wait_for_semaphore, wait_for_mutex, m_delay;
int m_serial, m_num;
//获得本线程的信息;
m_num = ((ThreadInfo*)(p))->num;
m_serial = ((ThreadInfo*)(p))->serial;
m_delay = (DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC);

while(m_num) {
//开始请求生产
printf("生产者 %d 发送生产请求信号.\n",m_serial);
bool flag = 0;
//确认有空缓冲区可供生产,同时将空位置数empty减1,如果没有等待
//wait_for_semaphore = WaitForSingleObject(empty_semaphore, -1);
//互斥访问下一个可用于生产的空临界区,实现写写互斥
wait_for_mutex = WaitForSingleObject(h_mutex, -1);
int ProducePos = FindProducePosition();
if(ProducePos == -1) {
flag = 1;
printf("生产者 %d 被挂起\n", m_serial);
}
while(ProducePos == -1) {
Sleep(500);
ProducePos = FindProducePosition();
}
ReleaseMutex(h_mutex);
ReleaseSemaphore(h_Semaphore[ProducePos], -1, NULL);
if(flag)
printf("生产者 %d 被激活\n", m_serial);
wait_for_semaphore = WaitForSingleObject(h_Semaphore[ProducePos], -1);
//将临界区要写入的位置锁定 防止被别的线程读写
//EnterCriticalSection(&PC_Critical[ProducePos]);
//生产者在获得自己的空位置并做上标记后,生产操作可以并发执行
printf("生产者 %d 开始在缓冲区 %d 生产产品.\n",m_serial, ProducePos);
Sleep(m_delay); //生产时延
Buffer_Critical[ProducePos] = m_serial;
printf("生产者 %d 完成生产 缓冲区[ %d ]: %d\n", m_serial, ProducePos, Buffer_Critical[ProducePos]);
//使生产者写的缓冲区可以被多个消费者使用,实现读写同步;
//ReleaseMutex(h_mutex);

//将缓冲区要写入的位置释放
//LeaveCriticalSection(&PC_Critical[ProducePos]);
//将缓冲区要写入的位置信号量加1
ReleaseSemaphore(h_Semaphore[ProducePos], 1, NULL);
//ReleaseSemaphore(empty_semaphore, -1, NULL);
m_num --;
}
}

//消费者进程
void Consume(void * p) {
//局部变量声明;
DWORD wait_for_semaphore,m_delay,wait_for_mutex;
int m_serial, m_num;

//提取本线程的信息到本地
m_serial = ((ThreadInfo*)(p))->serial;
m_delay = (DWORD)(((ThreadInfo*)(p))->delay * INTE_PER_SEC);
m_num = ((ThreadInfo*)(p))->num;

while(m_num) {
//请求消费下一个产品
printf("消费者 %d 请求消费产品\n", m_serial);
bool flag = 0;
//如果缓冲区没有产品,则等待
wait_for_mutex = WaitForSingleObject(h_mutex2, -1);
int BufferPos = FindBufferPosition();
if(BufferPos == -1) {
flag = 1;
printf("消费者 %d 被挂起\n", m_serial);
}
while(BufferPos == -1) {
Sleep(500); //等待500毫秒
BufferPos = FindBufferPosition();
}
ReleaseMutex(h_mutex2);
if(flag)
printf("消费者 %d 被激活\n",m_serial);
//等待缓冲区可以被读
wait_for_semaphore = WaitForSingleObject(h_Semaphore[BufferPos], -1);
ReleaseSemaphore(h_Semaphore[BufferPos], -1, NULL);
//printf("消费者 %d 请求消费产品 %d\n", m_serial, BufferPos);
//锁定要消费位置的临界区,防止同时被两个进程读写
// EnterCriticalSection(&PC_Critical[BufferPos]);

printf("消费者 %d 开始消费产品\n",m_serial);
Sleep(m_delay); //生产时延
Buffer_Critical[BufferPos] = -1; //标记缓冲区为空;
printf("消费者 %d 消费完成 缓冲区[ %d ]: %d\n", m_serial, BufferPos, Buffer_Critical[BufferPos]);

//离开临界区
//LeaveCriticalSection(&PC_Critical[BufferPos]);
ReleaseSemaphore(empty_semaphore, 1, NULL);
ReleaseSemaphore(h_Semaphore[BufferPos], 1, NULL);
m_num --; //还需消费的数量减1
}
}

//5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  操作系统 线程 并发