您的位置:首页 > 其它

条件变量 condition variable

2016-01-17 20:06 127 查看

条件变量 condition variable

怎么用

man pthread_cond_init

man pthread_cond_wait

注意:

必须和mutex一起用,必须先锁住mutex,再wait,否则的行为是未定义的。

不是信号安全的。因此不要在信号的handler里用

用同一个mutex,否则行为未定义

如何实现无限缓冲区

假设只有一个消费者,一个生产者

// 生产者
lock(&mutex)
queue.push()
if (queue.size() == 1) {
pthread_cond_signal(&cond)
}
unlock(&mutex)

// 消费者
lock(&mutex)
while (queue.size() <= 0) {
pthread_cond_wait(&cond, &mutex);
}
item = queue.pop()
unlock()


假设有多个生产者,多个消费者,则pthread_cond_signal的时候,只能唤醒一个消费者,并发程度不够,必须要把所有消费者都用上才行。方法是:

1. 不判断,直接pthread_cond_signal.会有点overhead,不过如果没有线程在等待的话,这个函数也没干啥。

2. 判断size()==1时,pthread_cond_broadcast.问题是,虽然广播了,全唤醒了,但只有1个线程抢到了,其他还是继续sleep。

3. 自己维护正在sleep的消费者的个数,来决定调用几次pthread_cond_signal。增加了维护负担。

如何实现有限缓冲区

跟无限缓冲区相比,这回,两个线程都要做signal和wait。理论上,用一个条件变量仍然可以实现:

// 生产者
lock(&mutex)
while (queue.size() >= N){
pthread_cond_wait(&cond, &mutex)
}
queue.push()
if (queue.size() == 1) {
pthread_cond_signal(&cond)
}
unlock(&mutex)

// 消费者
lock(&mutex)
while (queue.size() <= 0){
pthread_cond_wait(&cond, &mutex)
}
item = queue.pop()
if (queue.size() == N - 1) {
pthread_cond_signal(&cond)
}
unlock(&mutex)


因为只有一个生产者和消费者,所以生产者在pthread_cond_signal的时候,只可能有2种情况:

1.没有线程在等待

2.只有消费者进程在等待

这两种情况下,我们去pthread_cond_signal,都是正确的。

但如果有多个生产者和多个消费者,那么生产者在pthread_cond_signal的时候,可能有2种情况:

1.没有线程在等待

2.只有消费者线程在等待

3.只有其他生产者线程在等待

4.消费者和生产者都有在等待

3和4情况下,就可能把生产者线程唤醒,而没有把消费线程唤醒。因此,这个时候,应该把pthread_cond_signal换成pthread_cond_broadcast才对

或者,我们可以用2个条件变量,分别对应生产者和消费者的等待条件:

// 生产者
lock(&mutex)
while (queue.size() >= N){
pthread_cond_wait(&cond_1, &mutex)
}
queue.push()
pthread_cond_signal(&cond_2)
unlock(&mutex)

// 消费者
lock(&mutex)
while (queue.size() <= 0){
pthread_cond_wait(&cond_2, &mutex)
}
item = queue.pop()
pthread_cond_signal(&cond_1)
unlock(&mutex)


这样,性能会好些。毕竟,broadcast的惊群会带来很多无用的功夫。另外,由于在“无限缓冲区”讨论过的原因,这里每次都执行了pthread_cond_signal.这是一种性能可以接受、维护成本低、逻辑正确的方案。

不用条件变量的替代方法,及其缺陷

用循环自旋,这样很浪费CPU

自旋+sleep的话,sleep如果很短,依然很浪费CPU,sleep太长,则其实已经通知到了,却还在sleep,影响实时性。就算这样,由于sleep每次都是系统调用,开销依然比条件变量大

pthread_cond_wat做了什么

把当前线程挂到condition variable的等待队列中,再释放mutex。这两个操作是原子的。

“原子”是指,这个过程中如果别的线程pthread_cond_signal了一下,那么等效于wait结束后它再signal。

signal的时候应该先unlock还是先signal

1.如果先unlock,那么:

signal线程释放锁,锁可能被其他signal线程抢到,这个没问题。wait线程理论上没唤醒,是不会抢到锁的。如果发生假唤醒,那么会再次check,逻辑上也没有问题。

先signal,那么wait线程被唤醒,在wait函数内,抢不到锁,就不会返回,他还是会将自己放回到cond的等待队列。后来还是要等signal线程释放锁之后才行。有个优化,是wait函数在这种情况下会把自己从cond的等待队列中取出,挂到mutex的等待队列上去。这样,再次signal的时候,实际上就谁也没唤醒。只不过在释放mutex的时候它们会来一起抢。这叫“等待转移wait morphing”

pthread_cond_wait的while循环check和假唤醒是怎么回事?

正确用法是用循环:

while(!condition()) {
pthread_wait(&cond, &mutex);
}


不能用if:

if(!condition()) {
pthread_wait(&cond, &mutex);
}


用循环有3个原因:

1. spurious wakeup:理论上似乎只有pthread_cond_signal和pthread_cond_broadcast能唤醒wait的线程,但实际的实现中,有可能有其他情况会将其唤醒,因此需要再check一次。

如果要消除这种假唤醒,条件变量的实现可能要加一些逻辑,造成性能损失,权衡之后,设计者认为还是由调用者加个while简单些。因为线程可能会接到信号,会造成这种现象:

If a signal is delivered to a thread waiting for a condition variable,

upon return from the signal handler the thread resumes waiting for the

condition variable as if it was not interrupted, or it shall return

zero due to spurious wakeup.

看这里的讨论:

https://groups.google.com/forum/#!msg/comp.programming.threads/Gpe38hgZlsQ/1_1SRoCZgrsJ

2.broadcast的时候,会唤醒多个wait线程,那么可能只有一个线程就会把数据消费完,那么剩下的线程应该再次check一下,否则逻辑就错了。

用这种broadcast,事实上用一个条件变量就可以实现“有限缓冲区”,只不过每次会把所有线程都唤醒一遍,然后各个线程各自check自己的不同条件,效率略低一些。

在这种设计中,broadcast的语义就是“状态已经变了,你们各自检查下吧”

另外,pthread_cond_signal理论上并不保证只唤醒一个线程。它保证的是,如果有线程在等待,会唤醒至少一个线程。

3.万一程序bug呢。。比如等待 !queue.empty()的时候去消费,另一个线程push进了一个数,其实push没成功,但误以为成功了,于是调用了signal。

什么情况下会出现假唤醒,跟futex和中断是什么关系?

正确的答案是这个:

http://stackoverflow.com/questions/1050592/do-spurious-wakeups-actually-happen/1051816#1051816

一般来讲,慢速系统调用如何被信号打断,那么信号返回后,系统调用会返回-1,errno=EINTR。

什么是慢速系统调用

有可能无限阻塞的系统调用。

慢速系统调用正在block的时候被信号打断的话,会去执行用户态的handler,然后系统调用本身会返回-1,同时errno置为EINTR。需要自己用while循环,重新调用:

while (1) {
int ret = syscall();
if (ret < 0 && errno == EINTR)
continue;
else
break;
}


如果设置了“”call to sigaction with the SA_RESTART flag set,那么系统调用就会自动执行一遍。

参考: http://www.win.tue.nl/~aeb/linux/lk/lk-4.html#ss4.5

futex

因为pthread_cond_wait底层用futex实现,如果futex期间出现一个信号,那么futex会返回-1,errno=EINTR。系统调用这个时候的典型做法是直接重新调用。但如果这个时候futex重启之前,出现了一次pthread_cond_signal,那么线程就错过了。假如把等待的变量放到pthread_cond_wait里面,就没这个问题了,重新调用futex之前check一下,不过那样的话效率可能稍差一些。于是这个检查被放在了外面。

测试

不过,在Linux上试了一下,发现futex直接就重启了似乎:

[chenming-xy@build17 ~]$ strace -p 12215 -f -s 100
Process 12215 attached with 2 threads - interrupt to quit
[pid 12216] futex(0x6012a4, FUTEX_WAIT_PRIVATE, 1, NULL <unfinished ...>
[pid 12215] futex(0x7f0ef37e09d0, FUTEX_WAIT, 12216, NULL

<unfinished ...>
[pid 12216] <... futex resumed> )       = ? ERESTARTSYS (To be restarted)
[pid 12216] --- SIGTERM (Terminated) @ 0 (0) ---
[pid 12216] fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 0), ...}) = 0
[pid 12216] mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f0ef37f5000
[pid 12216] write(1, "Catch a signal 15\n", 18) = 18
[pid 12216] rt_sigreturn(0x7f0ef37df540) = -1 EINTR (Interrupted system call)
[pid 12216] futex(0x6012a4, FUTEX_WAIT_PRIVATE, 1, NULL


代码:

#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <stdio.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int count = 0;

static void term_hander(int s) {
printf("Catch a signal %d\n", s);
}

void* consumer(void *data) {
// 1.signal
struct sigaction act, oact;
act.sa_handler = term_hander;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGTERM, &act, &oact);
pthread_mutex_lock(&mutex);
while(count <= 0) {
pthread_cond_wait(&cond, &mutex);
printf("accident wake up: count = %d\n", count);
}
printf("consumer %d items\n", count);
count = 0;
pthread_mutex_unlock(&mutex);
return NULL;
}

void* producer(void *data) {
sleep(100);
pthread_mutex_lock(&mutex);
count++;
pthread_cond_signal(&cond);
printf("produce %d items\n", count);
pthread_mutex_unlock(&mutex);
return NULL;
}

int main() {

pthread_t t1;
pthread_t t2;
int ret;

//    pthread_create(&t1, NULL, producer, NULL);
pthread_create(&t2, NULL, consumer, NULL);

ret = pthread_join(t2, NULL);
printf("consumer exit with code: %d\n", ret);
//   ret = pthread_join(t1, NULL);
//  printf("producer exit with code: %d\n", ret);
return 0;
}


多进程支持

设置属性即可,然后放在共享内存里。但还是别用了。

条件变量是否公平?

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: