谈谈我对Linux下“生产者/消费者线程模型”的理解
2018-03-06 21:06
302 查看
生产者消费者线程模型常常用于网络编程,即一个线程监听事件发生(生产者,例如产生“收到数据”事件),其他线程来处理事件(消费者,例如处理收到的数据)比较笨的办法是:
消费者线程不停地loop等待数据,当生产者线程发现收到数据时,找一个线程(先不讨论找线程的策略),把“收到数据”这一事件告诉消费者线程。消费者线程会在下一个loop对这个事件进行处理,处理完毕后,继续loop,直到下一个事件到来。但这么做的缺点显而易见,消费者线程不停地空跑,sleep时间太长,会降低系统的瞬间相应速度;sleep时间太短又会无意义地消耗CPU资源。所以理想中的方法,最好是能有一个事件触发机制,即:消费者线程阻塞等待时间发生,事件一旦触发,立即运行之后的代码,省去了上面方案中等待一个loop的时间,也省去了可能对cpu造成的消耗。于是,比较好的办法是:
消费者线程阻塞等待事件发生,当生产者线程发现收到数据时,通知某一个消费者线程(同样先不讨论找线程的策略),该消费者线程立即从阻塞中回复,继续执行。值得庆幸的是,linux提供了API来实现这样的目的:
【★】当事件发生时,生产者线程用pthread_cond_signal/pthread_cond_broadcast来激活(两个函数区别在于pthread_cond_signal激活一个线程,pthread_cond_broadcast激活全部线程)。注意:这里,出现了第一个很容易搞错的问题,即:pthread_cond_signal其实并不一定只激活某一个线程,具体原因在manual中有描述:
在多CPU的情况下(多核也是多个CPU),想避免唤醒一个以上的线程是无法做到的。也就是说,即使调用pthread_cond_signal,仍然可能使多个线程的pthread_cond_wait/pthread_cond_timewait返回。而通常情况下的生产者消费者模型中,每一个事件只需要一个消费者线程处理就够了,那么怎么保证一次pthread_cond_signal只唤醒任意一个线程呢,这个随后讨论。首先,基于上述API的描述,那么直接一点想到的可能是下面的做法:Consumer Thread:
因为线程调度的顺序是不可控的,假设某次signal通知消费者线程,有事件发生,在消费者线程执行处理该事件代码时,生产者线程又发送了另一个signal,也就是说,如果pthread_cond_signal在pthread_cond_wait之前执行呢?显然,pthread_cond_wait将错过这次signal的激活。
那么,简单地修改可以解决下面的问题,即:增加一个待处理事件列表,根据列表是否为空,来判断是否还有没处理的事件,即不完全依赖signal触发。于是代码变成了下面的样子:
Consumer Thread:
虽然看起来一切是美好的,但又不得不考虑这样一个问题:
如果在上述代码ConsumerThread的may be race condition part处产生race condition呢?假设在ConsumerThread执行完unlock后,ProducerThread执行了signal呢?所以,这里就引出了pthread_cond_wait/pthread_cond_timewait为什么需要一个mutex参数的问题。
为了解决上面这个可能出现的race condition,pthread_cond_wait/pthread_cond_timewait在实现时,先进入等待状态,才释放这个mutex,在被激活返回的时候再重新lock,这样就不会存在may be race condition part的空间,也就不会出现漏掉事件的情况。好,修改一下,代码变成了这样:
Consumer Thread:
在多个ConsumerThread的情况下,既然pthread_cond_signal无法保证只使一个线程的pthread_cond_wait/pthread_cond_timewait返回,那怎么保证只有线程去真正的处理事件呢?
终于要引出最终的版本:即陈硕在《linux多线程服务端编程》中提到的,这种模型只有一种正确的实现(只有这一种正确的方法,所以想用错都难),代码如下:Consumer Thread:
消费者线程不停地loop等待数据,当生产者线程发现收到数据时,找一个线程(先不讨论找线程的策略),把“收到数据”这一事件告诉消费者线程。消费者线程会在下一个loop对这个事件进行处理,处理完毕后,继续loop,直到下一个事件到来。但这么做的缺点显而易见,消费者线程不停地空跑,sleep时间太长,会降低系统的瞬间相应速度;sleep时间太短又会无意义地消耗CPU资源。所以理想中的方法,最好是能有一个事件触发机制,即:消费者线程阻塞等待时间发生,事件一旦触发,立即运行之后的代码,省去了上面方案中等待一个loop的时间,也省去了可能对cpu造成的消耗。于是,比较好的办法是:
消费者线程阻塞等待事件发生,当生产者线程发现收到数据时,通知某一个消费者线程(同样先不讨论找线程的策略),该消费者线程立即从阻塞中回复,继续执行。值得庆幸的是,linux提供了API来实现这样的目的:
int pthread_cond_timedwait( pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict a 103cc bstime); int pthread_cond_wait( pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); //========================= and ================================= int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);【★】pthread_cond_wait/pthread_cond_timewait用来等待(两个函数区别在于pthread_cond_timewait会有超时时间,超时会返回,而pthread_cond_wait则会一直阻塞)。
【★】当事件发生时,生产者线程用pthread_cond_signal/pthread_cond_broadcast来激活(两个函数区别在于pthread_cond_signal激活一个线程,pthread_cond_broadcast激活全部线程)。注意:这里,出现了第一个很容易搞错的问题,即:pthread_cond_signal其实并不一定只激活某一个线程,具体原因在manual中有描述:
在多CPU的情况下(多核也是多个CPU),想避免唤醒一个以上的线程是无法做到的。也就是说,即使调用pthread_cond_signal,仍然可能使多个线程的pthread_cond_wait/pthread_cond_timewait返回。而通常情况下的生产者消费者模型中,每一个事件只需要一个消费者线程处理就够了,那么怎么保证一次pthread_cond_signal只唤醒任意一个线程呢,这个随后讨论。首先,基于上述API的描述,那么直接一点想到的可能是下面的做法:Consumer Thread:
void ConsumeThread(void* param) { while(true) { pthread_cond_wait(cond); // 等待条件变量cond激活 // 消费事件的逻辑 } }Producer Thread:
void ProducerThread(void* param) { while(true) { // 产生事件的逻辑,如epoll_wait等 pthread_cond_signal(cond); // 激活等待在cond上的某个线程,让它来处理发生的事件 } }但是,上面的代码有一个问题,pthread_cond_wait却需要一个mutex参数,这个是什么原因呢?这个需要一步步来解释。首先考虑下面的情况:
因为线程调度的顺序是不可控的,假设某次signal通知消费者线程,有事件发生,在消费者线程执行处理该事件代码时,生产者线程又发送了另一个signal,也就是说,如果pthread_cond_signal在pthread_cond_wait之前执行呢?显然,pthread_cond_wait将错过这次signal的激活。
那么,简单地修改可以解决下面的问题,即:增加一个待处理事件列表,根据列表是否为空,来判断是否还有没处理的事件,即不完全依赖signal触发。于是代码变成了下面的样子:
Consumer Thread:
void ConsumeThread(void* param) { while(true) { pthread_mutex_lock(&mutex); if(待处理事件列表.empty() == true) { pthread_mutex_unlock(&mutex); /* may be race condition part */ pthread_cond_wait(&cond); // 等待条件变量cond激活 pthread_mutex_lock(&mutex); } // 从“待处理事件列表”弹出一个事件; pthread_mutex_unlock(&mutex); // 消费事件的逻辑 } }Producer Thread:
void ProducerThread(void* param) { while(true) { // 产生事件的逻辑,如epoll_wait等 pthread_mutex_lock(&mutex); // 待处理事件列表.insert(新事件); pthread_cond_signal(cond); // 激活等待在cond上的某个线程,让它来处理发生的事件 pthread_mutex_unlock(&mutex); } }这样做,看起来就没什么问题了,我们增加了一个待处理事件列表,在生产者产生事件时,插入到这个列表中,这样即使消费者线程正在干别的,等别的事情干完了,一判断:if(待处理事件列表.empty() == false),就又会接着进入消费的逻辑。从而使事件不会被丢掉。直到真正处理完毕。
虽然看起来一切是美好的,但又不得不考虑这样一个问题:
如果在上述代码ConsumerThread的may be race condition part处产生race condition呢?假设在ConsumerThread执行完unlock后,ProducerThread执行了signal呢?所以,这里就引出了pthread_cond_wait/pthread_cond_timewait为什么需要一个mutex参数的问题。
为了解决上面这个可能出现的race condition,pthread_cond_wait/pthread_cond_timewait在实现时,先进入等待状态,才释放这个mutex,在被激活返回的时候再重新lock,这样就不会存在may be race condition part的空间,也就不会出现漏掉事件的情况。好,修改一下,代码变成了这样:
Consumer Thread:
void ConsumeThread(void* param) { while(true) { pthread_mutex_lock(&mutex); if(待处理事件列表.empty() == true) { pthread_cond_wait(&cond, &mutex); // 等待条件变量cond激活 } // 从“待处理事件列表”弹出一个事件; pthread_mutex_unlock(&mutex); // 消费事件的逻辑 } }Producer Thread:
void ProducerThread(void* param) { while(true) { // 产生事件的逻辑,如epoll_wait等 pthread_mutex_lock(&mutex); // 待处理事件列表.insert(新事件); pthread_cond_signal(cond); // 激活等待在cond上的某个线程,让它来处理发生的事件 pthread_mutex_unlock(&mutex); } }这样一来,总算是没什么race condition问题了,但是,还有一个问题没有解决,也就是一开始提到的:
在多个ConsumerThread的情况下,既然pthread_cond_signal无法保证只使一个线程的pthread_cond_wait/pthread_cond_timewait返回,那怎么保证只有线程去真正的处理事件呢?
终于要引出最终的版本:即陈硕在《linux多线程服务端编程》中提到的,这种模型只有一种正确的实现(只有这一种正确的方法,所以想用错都难),代码如下:Consumer Thread:
void ConsumeThread(void* param) { while(true) { pthread_mutex_lock(&mutex); // 【注意】这里的if替换成了while while(待处理事件列表.empty() == true) { pthread_cond_wait(&cond, &mutex); // 等待条件变量cond激活 } // 从“待处理事件列表”弹出一个事件; pthread_mutex_unlock(&mutex); // 消费事件的逻辑 } }Producer Thread:
void ProducerThread(void* param) { while(true) { // 产生事件的逻辑,如epoll_wait等 pthread_mutex_lock(&mutex); // 待处理事件列表.insert(新事件); pthread_cond_signal(cond); // 激活等待在cond上的某个线程,让它来处理发生的事件 pthread_mutex_unlock(&mutex); } }为什么把if替换成while可以解决问题?设想一下,当一个ConsumerThread被唤醒后,这个线程会马上获得mutex锁(回顾一下上面说过的,pthread_cond_wait/pthread_cond_timewait在返回之前会重新对mutex加锁),然后执行while循环的判断,直到把事件弹出,才会释放mutex,这样,等这个线程释放mutex,另一个Consumer再去执行while循环的判断时,已经发现事件被弹出了,没有要处理的了(即使有,也是另外一个事件,不会发生多个ConsumerThread都去拿同一个事件的竞争),然后继续进入等待。以上行为,符合我们对“生产者/消费者模型”的预期。转自:http://blog.csdn.net/xocoder/article/details/50961604
相关文章推荐
- 谈谈我对Linux下“生产者/消费者线程模型”的理解
- 从java多线程实现“生产者-消费者”模型来谈谈操作系统中线程状态的转换及线程同步的总结
- 关于js中的单线程和异步事件同操作系统的生产者消费者模型的理解
- Linux线程--生产者消费者模型
- LINUX 线程锁的设置不当案例(生产者消费者模型小BUG)
- Linux C:利用两个线程实现生产者消费者模型
- 【Linux】线程总结:线程同步 -互斥锁,条件变量,信号量实现多生产者多消费者模型
- 从Java多线程实现“生产者-消费者”模型来谈谈操作系统中线程状态的转换
- Linux——线程锁实现的生产者、消费者模型
- linux生产者消费者模型源代码
- 【Linux】生产者消费者模型介绍
- linux进程并发模型生产者和消费者模式编程
- 生产者消费者模型--2个线程
- 多线程实现生产者消费者模型,以及线程和进程的回顾
- 利用两个线程实现生产者消费者模型
- 线程通信之生产者消费者模型
- java: 线程间通信经典模型“生产者-消费者”模型的实现
- 生产者消费者模型(Linux系统下的两种实现方法)
- Linux下的生产者消费者模型模型
- python基础-进线程下的queue、及其生产者消费者模型(2种方式)