您的位置:首页 > 其它

生产者与消费者模型(线程)

2016-12-26 23:29 405 查看
#include <stdio.h>

#include <time.h>

#include <stdlib.h>

#include <pthread.h>

#include <semaphore.h>

#include <string.h>

#define QUEUE_SIZE  100

#define OK            0   

#define ERROR        -1    

typedef int DataType;

typedef struct

{
DataType data[QUEUE_SIZE];
int rear, front;

}SeqQueue;

struct msg

{
SeqQueue queue;
sem_t empty; // 控制生产者,当empty为1 证明缓冲区没有商品,此时生产者可以生产商品
sem_t full;  // 控制消费者,当full为1  证明缓冲区有商品, 此时可以消费

};

struct msg goods;

pthread_mutex_t mutex;

// 置空队列

int InitQueue(SeqQueue *q)

{
if (q == NULL)
{
return ERROR;
}

q->rear = 0;
q->front = 0;

return OK;

}

// 判断是否空队

int QueueEmpty(SeqQueue *q)

{
if (q == NULL)
{
return ERROR;
}

return q->rear == q->front;

}

// 判断是否队满

int QueueFull(SeqQueue *q)

{
if (q == NULL)
{
return ERROR;
}

return (q->rear+1)%QUEUE_SIZE == q->front;

}

// 入队

int EnQueue(SeqQueue *q, int data)

{
if (q == NULL)
{
return ERROR;
}

if (QueueFull(q))
{
return ERROR;
}

q->rear = (q->rear+1) % QUEUE_SIZE;
q->data[q->rear] = data;

return OK;

}

// 出队

int DeQueue(SeqQueue *q)

{
if (q == NULL)
{
return ERROR;
}

if (QueueEmpty(q))
{
return ERROR;
}

q->front = (q->front+1) % QUEUE_SIZE;

return q->data[q->front];

}

// 获取队头元素

int GetFront(SeqQueue *q)

{
if (q == NULL)
{
return ERROR;
}

if (QueueEmpty(q))
{
return ERROR;
}

int pos = (q->front+1) % QUEUE_SIZE;

return q->data[pos];

}

void delay()

{
int k1 = rand()%10000+1;

while (k1)
{
int k2 = rand()%10000+1;
while(k2)
{
k2--;
}
k1--;
}

}

// 生产一条消息

int produce_item()

{
return rand()%10+1;

}

// 将消息放入缓冲区

void insert_item(int item)

{
EnQueue(&goods.queue, item);

printf("produce a item : %d\n", item);

}

// 将消息取出缓冲区

int remove_item()

{
return DeQueue(&goods.queue);

}

// 处理消息

void consumer_item(int item)

{
printf("a item has consumed: %d\n", item);

}

// 生产者线程函数

void *produce(void *arg)

{
int item;   // 消息
while(1)
{
item = produce_item();       // 生产一条消息
sem_wait(&goods.empty);      // 获得表示空闲空间的信号量
pthread_mutex_lock(&mutex);  // 加锁

insert_item(item); 

pthread_mutex_unlock(&mutex);// 解锁

delay();
sem_post(&goods.full);
}

}

// 消费者线程函数

void *consume(void *arg)

{
int item;
while(1)
{
sem_wait(&goods.full);       // 获得表示消息个数的信号量
pthread_mutex_lock(&mutex);  // 加锁

item =remove_item();         // 取得消息

pthread_mutex_unlock(&mutex);// 解锁

sem_post(&goods.empty);
delay();
consumer_item(item);
}

}

int init()

{

    //初始化随机数
srand((unsigned int)time(NULL));

// 初始化队列
if (InitQueue(&goods.queue) != OK)
{
return -1;
}

// 初始化信号量sem_init(&goods.empty, 0, 100);
sem_init(&goods.full, 0, 0);

//初始化锁
pthread_mutex_init(&mutex, NULL);

}

void destroy()

{
// 销毁信号量
sem_destroy(&goods.empty);
sem_destroy(&goods.full);

pthread_mutex_destroy(&mutex);

}

int main()

{
pthread_t produceId[5], consumeId;
int ret;

// 初始化相关资源
if (init() == -1)
{
return -1;
}

// 创建生产者线程
int i;
for (i = 0; i < 5; i++)
{
ret = pthread_create(&produceId[i], NULL, produce, NULL);
if (ret != 0)
{
printf ("create produce thread fail: %s", strerror(ret));
return -1;
}
}

// 创建消费者线程
ret = pthread_create(&consumeId, NULL, consume, NULL);
if (ret != 0)
{
printf ("create produce thread fail: %s", strerror(ret));
return -1;
}

// 等待生产者线程结束
for (i = 0; i < 5; i++)
{
ret = pthread_detach(produceId[i]);
if (0 != ret)

{
printf("wait thread terminates fail:%s", strerror(ret));
return -1;
}
}

// 等待消费者线程结束
ret = pthread_join(consumeId, NULL);
if (0 != ret)

{
printf("wait thread terminates fail:%s", strerror(ret));
return -1;
}

// 释放销毁相关资源
destroy();
return 0;
}

运行结果:



心得体会:

创立这个生产者与消费者模型。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: