您的位置:首页 > 运维架构 > Linux

生产者、消费者模型

2017-06-14 13:52 351 查看

linux中当两个线程要访问同一块临界区域时,比如一个读进程,一个写进程,一个在临界区域写数据,另一个在临界区域读数据,被访问的这块临界区域通常叫缓冲区,而往这块缓冲区里写数据的叫生产者,在这块缓冲区里读数据的叫消费者。

要实现消费者与生产者的关系,要满足一个原则,就是“321“原则。

3代表的是有三个关系:3种关系:生产者与生产者的关系、消费者与消费者的关系、生产者与消费 者的关系

2代表的是两种角色:生产者、消费者

1就是一个交易场所:缓冲区

其中生产者与生产者存在互斥关系、消费者与消费者之间存在互斥关系、生产者与消费者之间存在同步与互斥关系。

关系如下图



为什么不让生产者直接调用消费者的某个函数,直接把数据传递过去?搞出这么⼀一个缓冲区呢?

1、解耦

假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产⽣生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

支持并发(concurrency)

生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后⾯面的帖⼦子会讲两种并发类型下的应⽤用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。

支持忙闲不均

缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

思考一下,如果生产者往缓冲区里生产了一部分数据时,消费者就来直接读取了,那么消费者读到的数据和生产者生产的数据其实不是一个,也就是说生产者与消费者之间没有互斥的制约,所以就必须引入互斥机制,即互斥锁。

单链表里用生产者与消费者模型来实现

#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<assert.h>

typedef struct Node
{
int data;
struct Node * next;
}Node,*Node_p,**Node_pp;

Node_p CreatNode(int data)
{
Node_p _n=(Node_p)malloc(sizeof(Node));
if(_n==NULL)
{
return NULL;
}
_n->data=data;
_n->next=NULL;
return _n;
}

void Init(Node_pp list)
{
*list=CreatNode(0);
}

void PushFront(Node_p list ,int data)
{
assert(list);
Node_p _n=CreatNode(data);
if(_n==NULL)
{
perror("Push");
return;
}

_n->next=list->next;
list->next=_n;
}

void del_Node(Node_p del)
{
assert(del);
free(del);
}

void PopFront(Node_p list,int *data)
{
if(!isEmpty(list))
{
Node_p del=list->next;
list->next=del->next;
*data=del->data;
del_Node(del);
}
else
{
printf("list Empty\n");
}
}

int isEmpty(Node_p list)
{
assert(list);
if(list->next==NULL)
return 1;
else
return 0;
}

void destroy(Node_p list)
{
int data;
assert(list);
while(!isEmpty(list))
{
PopFront(list,&data);
}
del_Node(list);
}

void ShowList(Node_p list)
{
assert(list);
Node_p cur=list->next;
while(cur->next)
{
printf("%d->",cur->data);
cur=cur->next;
}
printf("\n");
}

Node_p list=NULL;
pthread_mutex_t mylock= PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mycond=PTHREAD_COND_INITIALIZER;

void * Consumer(void *arg)
{
int data=0;
while(1)
{
pthread_mutex_lock(&mylock);
while(isEmpty(list))
{
pthread_cond_wait(&mycond,&mylock);
}
PopFront(list,&data);
pthread_mutex_unlock(&mylock);
printf("consumer:%d\n",data);
}
return NULL;
}

void * Producer(void *arg)
{
int data=0;
while(1)
{
usleep(1000000);
data=rand()%1000;
pthread_mutex_lock(&mylock);
PushFront(list,data);
pthread_mutex_unlock(&mylock);
pthread_cond_signal(&mycond);
printf("Producer:%d\n",data);
}
return NULL;
}

int main()
{
Init(&list);
pthread_t tid1,tid2;
pthread_create(&tid1,NULL,Consumer,NULL);
pthread_create(&tid2,NULL,Producer,NULL);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
destroy(list);
pthread_mutex_destroy(&mylock);
pthread_cond_destroy(&mycond);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息