您的位置:首页 > 其它

POSIX 使用互斥量和条件变量实现生产者/消费者问题

2015-05-06 12:37 579 查看
boost的mutex,condition_variable非常好用。但是在Linux上,boost实际上做的是对pthread_mutex_t 和pthread_cond_t的一系列的封装。因此通过对原生态的POSIX 的mutex,cond的生成者,消费者的实现,我们可以再次体会boost带给我们的便利。

1. 什么是互斥量

互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线
程将会被阻塞直到当前线程释放该互斥锁。如果释放互斥锁时有多个线程阻塞,所以在该互斥锁上的阻塞线程都会变成可进行状态,第一个变成运行状态的线程可以
对互斥量加锁,其他线程在次被阻塞,等待下次运行状态。

pthread_mutex_t 就是POSIX对于mutex的实现。

函数名参数说明
pthread_mutex_initpthread_mutex_t * mutex,

constpthread_mutex_t *attr
初始化一个互斥量,静态方式可以直接使用PTHREAD_MUTEX_INITIALIZER进行赋值初始化
pthread_mutex_destroypthread_mutex_t *mutex释放对互斥变量分配的资源。注意pthread_mutex_init有可能malloc了资源
pthread_mutex_lockpthread_mutex_t *mutex如果互斥量已经上锁,调用线程阻塞直至互斥量解锁
pthread_mutex_trylockpthread_mutex_t *mutex加锁,如果失败不阻塞
pthread_mutex_unlockpthread_mutex_t *mutex解锁
使用init函数进行初始化:

[cpp] view plaincopy





#include <pthread.h>

pthread_mutex_t foo_mutex;

void foo()

{

pthread_mutex_init(&foo_mutex, NULL);

pthread_mutex_lock(&foo_mutex);

/* Do work. */

pthread_mutex_unlock(&foo_mutex);

pthread_mutex_destroy(&foo_mutex);

}

当然该初始化

[cpp] view plaincopy





pthread_mutex_init(&foo_mutex, NULL);

只能foo_mutex使用前初始化一次,最后destroy。初始化已经初始化的mutex将导致undefined behavior。

另外一种用法:

[cpp] view plaincopy





pthread_mutex_t foo_mutex = PTHREAD_MUTEX_INITIALIZER;

void foo()

{

pthread_mutex_lock(&foo_mutex);

/* Do work. */

pthread_mutex_unlock(&foo_mutex);

}


然了,这两种用法都有问题:如果在lock住后unlock之前出现exception,那么这个锁永远也不能unlock。这种情况下需要guard这
个资源。具体可参照boost::mutex::scoped_lock的实现,非常简单但是极大简化了mutex的安全使用。

2. 什么是条件变量

与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。

条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。

条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条
件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。

条件变量的初始化和mutex的初始化差不多,也是有两种方式:

pthread_cond_tmy_condition=PTHREAD_COND_INITIALIZER;

也可以利用函数pthread_cond_init动态初始化。

下面中各个函数的简介。

函数名参数说明
pthread_cond_initpthread_cond_t *cond,
const pthread_condattr_t *attr
初始化
pthread_cond_destroypthread_cond_t *cond回收
pthread_cond_waitpthread_cond_t *cond,
pthread_mutex_t *mutex
等待,无超时
pthread_cond_timedwaitpthread_cond_t *cond,pthread_mutex_t *mutex,
const struct timespec *abstime
等待,有超时
pthread_cond_signalpthread_cond_t *cond一个在相同条件变量上阻塞的线程将被解锁。如果同时有多个线程阻塞,则由调度策略确定接收通知的线程
pthread_cond_broadcastpthread_cond_t *cond将通知阻塞在这个条件变量上的所有线程。一旦被唤醒,线程仍然会要求互斥锁。
一个简单使用条件变量进行线程同步的小例子:

[cpp] view plaincopy





#include <pthread.h>

#include <stdio.h>

#include <stdlib.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *thread1(void *);

void *thread2(void *);

int i=1;

int main(void)

{

pthread_t t_a;

pthread_t t_b;

pthread_create(&t_a,NULL,thread2,(void *)NULL);/*create thread t_a*/

pthread_create(&t_b,NULL,thread1,(void *)NULL); /*create thread t_b*/

pthread_join(t_b, NULL);/*wait for exit of t_b*/

pthread_join(t_a, NULL);

pthread_mutex_destroy(&mutex);

pthread_cond_destroy(&cond);

exit(0);

}

void *thread1(void *junk)

{

for(i=1;i<=9;i++)

{

pthread_mutex_lock(&mutex);

if(i%3==0)

pthread_cond_signal(&cond);

else

printf("thread1 running, i = %d\n",i);

pthread_mutex_unlock(&mutex);

sleep(1);

}

}

void *thread2(void *junk)

{

while(i<9)

{

pthread_mutex_lock(&mutex);

if(i%3!=0)

pthread_cond_wait(&cond,&mutex);/*..*/

printf("thread2 running, i = %d\n",i);

pthread_mutex_unlock(&mutex);

sleep(1);

}

}

输出:

[cpp] view plaincopy





thread1 running, i = 1

thread1 running, i = 2

thread2 running, i = 3

thread1 running, i = 4

thread1 running, i = 5

thread2 running, i = 6

thread1 running, i = 7

thread1 running, i = 8

thread2 running, i = 9

3. 生产者-消费者的实现

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的
线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消
费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

[cpp] view plaincopy





#include <stdio.h>

#include <stdlib.h>

#define MAX 5

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥锁*/

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;/*初始化条件变量*/

typedef struct{

char buffer[MAX];

int how_many;

}BUFFER;

BUFFER share={"",0};

char ch='A';/*初始化ch*/

void *producer(void *);

void *consumer(void *);

int main(void)

{

pthread_t t_read;

pthread_t t_write;

pthread_create(&t_write,NULL,producer,(void *)NULL); /*创建进程t_a*/

pthread_create(&t_read,NULL,consumer,(void *)NULL); /*创建进程t_b*/

pthread_join(t_write,(void **)NULL);

pthread_join(t_read, NULL);

exit(0);

}

void *producer(void *junk)

{

int n=0;

printf("Producer: starting\n");

while(ch!='K')

{

pthread_mutex_lock(&mutex);/*锁住互斥量*/

if(share.how_many!=MAX)

{

share.buffer[share.how_many++]=ch++;/*把字母写入缓存*/

printf("Producer: put char[%c]\n",ch-1);/*打印写入字母*/

if(share.how_many==MAX)

{

printf("Producer: signaling full\n");

pthread_cond_signal(&cond);/*如果缓存中的字母到达了最大值就发送信号*/

}

}

pthread_mutex_unlock(&mutex);/*解锁互斥量*/

}

sleep(1);

printf("Producer:Exiting\n");

return NULL;

}

void *consumer(void *junk)

{

int i;

int n=0;

printf("Consumer: starting\n");

while(ch!='K')

{

pthread_mutex_lock(&mutex);/*锁住互斥量*/

printf("\nConsumer : Waiting\n");

while(share.how_many!=MAX)/*如果缓存区字母不等于最大值就等待*/

pthread_cond_wait(&cond,&mutex);

printf("Consumer: getting buffer:: ");

for(i=0;share.buffer[i]&&share.how_many;++i,share.how_many--)

putchar(share.buffer[i]); /*循环输出缓存区字母*/

putchar('\n');

pthread_mutex_unlock(&mutex);/*解锁互斥量*/

}

printf("Consumer: exiting\n");

return NULL;

}

输出:Producer: starting
Producer: put char[A]
Producer: put char[B]
Producer: put char[C]
Producer: put char[D]
Producer: put char[E]
Producer: signaling full
Consumer: starting

Consumer : Waiting
Consumer: getting buffer:: ABCDE

Consumer : Waiting
Producer: put char[F]
Producer: put char[G]
Producer: put char[H]
Producer: put char[I]
Producer: put char[J]
Producer: signaling full
Consumer: getting buffer:: FGHIJ
Consumer: exiting
Producer:Exiting

来源:/article/1579983.html

http://blog.csdn.net/anzhsoft2008/article/category/2123999
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: