生产者与消费者模型(线程)
2016-12-26 23:29
405 查看
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#define QUEUE_SIZE 100
#define OK 0
#define ERROR -1
typedef int DataType;
typedef struct
{
DataType data[QUEUE_SIZE];
int rear, front;
}SeqQueue;
struct msg
{
SeqQueue queue;
sem_t empty; // 控制生产者,当empty为1 证明缓冲区没有商品,此时生产者可以生产商品
sem_t full; // 控制消费者,当full为1 证明缓冲区有商品, 此时可以消费
};
struct msg goods;
pthread_mutex_t mutex;
// 置空队列
int InitQueue(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
q->rear = 0;
q->front = 0;
return OK;
}
// 判断是否空队
int QueueEmpty(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
return q->rear == q->front;
}
// 判断是否队满
int QueueFull(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
return (q->rear+1)%QUEUE_SIZE == q->front;
}
// 入队
int EnQueue(SeqQueue *q, int data)
{
if (q == NULL)
{
return ERROR;
}
if (QueueFull(q))
{
return ERROR;
}
q->rear = (q->rear+1) % QUEUE_SIZE;
q->data[q->rear] = data;
return OK;
}
// 出队
int DeQueue(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
if (QueueEmpty(q))
{
return ERROR;
}
q->front = (q->front+1) % QUEUE_SIZE;
return q->data[q->front];
}
// 获取队头元素
int GetFront(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
if (QueueEmpty(q))
{
return ERROR;
}
int pos = (q->front+1) % QUEUE_SIZE;
return q->data[pos];
}
void delay()
{
int k1 = rand()%10000+1;
while (k1)
{
int k2 = rand()%10000+1;
while(k2)
{
k2--;
}
k1--;
}
}
// 生产一条消息
int produce_item()
{
return rand()%10+1;
}
// 将消息放入缓冲区
void insert_item(int item)
{
EnQueue(&goods.queue, item);
printf("produce a item : %d\n", item);
}
// 将消息取出缓冲区
int remove_item()
{
return DeQueue(&goods.queue);
}
// 处理消息
void consumer_item(int item)
{
printf("a item has consumed: %d\n", item);
}
// 生产者线程函数
void *produce(void *arg)
{
int item; // 消息
while(1)
{
item = produce_item(); // 生产一条消息
sem_wait(&goods.empty); // 获得表示空闲空间的信号量
pthread_mutex_lock(&mutex); // 加锁
insert_item(item);
pthread_mutex_unlock(&mutex);// 解锁
delay();
sem_post(&goods.full);
}
}
// 消费者线程函数
void *consume(void *arg)
{
int item;
while(1)
{
sem_wait(&goods.full); // 获得表示消息个数的信号量
pthread_mutex_lock(&mutex); // 加锁
item =remove_item(); // 取得消息
pthread_mutex_unlock(&mutex);// 解锁
sem_post(&goods.empty);
delay();
consumer_item(item);
}
}
int init()
{
//初始化随机数
srand((unsigned int)time(NULL));
// 初始化队列
if (InitQueue(&goods.queue) != OK)
{
return -1;
}
// 初始化信号量sem_init(&goods.empty, 0, 100);
sem_init(&goods.full, 0, 0);
//初始化锁
pthread_mutex_init(&mutex, NULL);
}
void destroy()
{
// 销毁信号量
sem_destroy(&goods.empty);
sem_destroy(&goods.full);
pthread_mutex_destroy(&mutex);
}
int main()
{
pthread_t produceId[5], consumeId;
int ret;
// 初始化相关资源
if (init() == -1)
{
return -1;
}
// 创建生产者线程
int i;
for (i = 0; i < 5; i++)
{
ret = pthread_create(&produceId[i], NULL, produce, NULL);
if (ret != 0)
{
printf ("create produce thread fail: %s", strerror(ret));
return -1;
}
}
// 创建消费者线程
ret = pthread_create(&consumeId, NULL, consume, NULL);
if (ret != 0)
{
printf ("create produce thread fail: %s", strerror(ret));
return -1;
}
// 等待生产者线程结束
for (i = 0; i < 5; i++)
{
ret = pthread_detach(produceId[i]);
if (0 != ret)
{
printf("wait thread terminates fail:%s", strerror(ret));
return -1;
}
}
// 等待消费者线程结束
ret = pthread_join(consumeId, NULL);
if (0 != ret)
{
printf("wait thread terminates fail:%s", strerror(ret));
return -1;
}
// 释放销毁相关资源
destroy();
return 0;
}
运行结果:
心得体会:
创立这个生产者与消费者模型。
#include <time.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#define QUEUE_SIZE 100
#define OK 0
#define ERROR -1
typedef int DataType;
typedef struct
{
DataType data[QUEUE_SIZE];
int rear, front;
}SeqQueue;
struct msg
{
SeqQueue queue;
sem_t empty; // 控制生产者,当empty为1 证明缓冲区没有商品,此时生产者可以生产商品
sem_t full; // 控制消费者,当full为1 证明缓冲区有商品, 此时可以消费
};
struct msg goods;
pthread_mutex_t mutex;
// 置空队列
int InitQueue(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
q->rear = 0;
q->front = 0;
return OK;
}
// 判断是否空队
int QueueEmpty(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
return q->rear == q->front;
}
// 判断是否队满
int QueueFull(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
return (q->rear+1)%QUEUE_SIZE == q->front;
}
// 入队
int EnQueue(SeqQueue *q, int data)
{
if (q == NULL)
{
return ERROR;
}
if (QueueFull(q))
{
return ERROR;
}
q->rear = (q->rear+1) % QUEUE_SIZE;
q->data[q->rear] = data;
return OK;
}
// 出队
int DeQueue(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
if (QueueEmpty(q))
{
return ERROR;
}
q->front = (q->front+1) % QUEUE_SIZE;
return q->data[q->front];
}
// 获取队头元素
int GetFront(SeqQueue *q)
{
if (q == NULL)
{
return ERROR;
}
if (QueueEmpty(q))
{
return ERROR;
}
int pos = (q->front+1) % QUEUE_SIZE;
return q->data[pos];
}
void delay()
{
int k1 = rand()%10000+1;
while (k1)
{
int k2 = rand()%10000+1;
while(k2)
{
k2--;
}
k1--;
}
}
// 生产一条消息
int produce_item()
{
return rand()%10+1;
}
// 将消息放入缓冲区
void insert_item(int item)
{
EnQueue(&goods.queue, item);
printf("produce a item : %d\n", item);
}
// 将消息取出缓冲区
int remove_item()
{
return DeQueue(&goods.queue);
}
// 处理消息
void consumer_item(int item)
{
printf("a item has consumed: %d\n", item);
}
// 生产者线程函数
void *produce(void *arg)
{
int item; // 消息
while(1)
{
item = produce_item(); // 生产一条消息
sem_wait(&goods.empty); // 获得表示空闲空间的信号量
pthread_mutex_lock(&mutex); // 加锁
insert_item(item);
pthread_mutex_unlock(&mutex);// 解锁
delay();
sem_post(&goods.full);
}
}
// 消费者线程函数
void *consume(void *arg)
{
int item;
while(1)
{
sem_wait(&goods.full); // 获得表示消息个数的信号量
pthread_mutex_lock(&mutex); // 加锁
item =remove_item(); // 取得消息
pthread_mutex_unlock(&mutex);// 解锁
sem_post(&goods.empty);
delay();
consumer_item(item);
}
}
int init()
{
//初始化随机数
srand((unsigned int)time(NULL));
// 初始化队列
if (InitQueue(&goods.queue) != OK)
{
return -1;
}
// 初始化信号量sem_init(&goods.empty, 0, 100);
sem_init(&goods.full, 0, 0);
//初始化锁
pthread_mutex_init(&mutex, NULL);
}
void destroy()
{
// 销毁信号量
sem_destroy(&goods.empty);
sem_destroy(&goods.full);
pthread_mutex_destroy(&mutex);
}
int main()
{
pthread_t produceId[5], consumeId;
int ret;
// 初始化相关资源
if (init() == -1)
{
return -1;
}
// 创建生产者线程
int i;
for (i = 0; i < 5; i++)
{
ret = pthread_create(&produceId[i], NULL, produce, NULL);
if (ret != 0)
{
printf ("create produce thread fail: %s", strerror(ret));
return -1;
}
}
// 创建消费者线程
ret = pthread_create(&consumeId, NULL, consume, NULL);
if (ret != 0)
{
printf ("create produce thread fail: %s", strerror(ret));
return -1;
}
// 等待生产者线程结束
for (i = 0; i < 5; i++)
{
ret = pthread_detach(produceId[i]);
if (0 != ret)
{
printf("wait thread terminates fail:%s", strerror(ret));
return -1;
}
}
// 等待消费者线程结束
ret = pthread_join(consumeId, NULL);
if (0 != ret)
{
printf("wait thread terminates fail:%s", strerror(ret));
return -1;
}
// 释放销毁相关资源
destroy();
return 0;
}
运行结果:
心得体会:
创立这个生产者与消费者模型。
相关文章推荐
- 生产者&消费者模型-线程间协调
- Java线程:并发协作-生产者消费者模型 转自:http://lavasoft.blog.51cto.com/62575/221932
- 谈谈我对Linux下“生产者/消费者线程模型”的理解
- 关于js中的单线程和异步事件同操作系统的生产者消费者模型的理解
- Java线程:并发协作-生产者消费者模型
- java线程深度解析(五)——并发模型(生产者-消费者)
- Java线程:并发协作-生产者消费者模型
- Java线程:并发协作-生产者消费者模型 转自:http://lavasoft.blog.51cto.com/62575/221932
- 生产者--消费者模型(线程锁方式)
- JAVA 线程通信以及多个生产者消费者模型
- 线程之生产者消费者模型
- Java线程:并发协作-生产者消费者模型
- 从生产者-消费者模型了解线程、同步、锁(java)
- 生产者消费者模型中线程如何正常退出
- Java线程:并发协作-生产者消费者模型
- Java线程同步:生产者-消费者 模型(代码示例)
- Java线程:并发协作-生产者消费者模型
- Java线程(九)-生产者消费者模型
- Java线程:并发协作-生产者消费者模型
- 基于线程实现的生产者消费者模型(Object.wait(),Object.notify()方法)