操作系统思考 第十章 条件变量
2016-07-16 15:18
387 查看
第十章 条件变量
作者:Allen B. Downey原文:Chapter 10 Condition variables
译者:飞龙
协议:CC BY-NC-SA 4.0
像上一章所展示的那样,许多简单的同步问题都可以用互斥体解决。这一章中我会介绍一个更大的挑战,著名的“生产者-消费者”问题,以及一个用于解决它的新工具,条件变量。
10.1 工作队列
在一些多线程的程序中,线程被组织用于执行不同的任务。通常它们使用队列来相互通信,其中一些线程叫做“生产者”,向队列中放入数据,另一些线程叫做“消费者”,从队列取出数据。例如,在GUI应用中,可能有一个运行GUI的线程响应用户事件,而其它线程负责处理用户的请求。这里,GUI线程可能将数据放入队列中,而“后台”线程从队列中取出请求并执行。
为了支持这种组织,我们需要一个“线程安全”的队列实现,也就是说每个线程都可以同时访问队列。我们至少需要处理一个特殊情况,队列是空的,以及如果队列的大小有限制,队列是满的。
我会从一个非线程安全的简单队列开始,之后我们会观察其中的错误并修复它。这个示例的代码在本书仓库的
queue目录中。
queue.c文件包含了一个环形缓冲区的基本实现。你可以在环形缓冲区的维基百科查询更多信息。
下面是结构体的定义:
typedef struct { int *array; int length; int next_in; int next_out; } Queue;
array是包含队列元素的数组。在这个例子中,元素都是整数,但是通常它们都是一些结构体,包含用户事件、工作项目以及其它。
length是数组的长度,
next_in是数组的下标,用于索引下个元素应该添加到哪里;与之相似,
next_out是应该被移除的下个元素的下标。
make_queue为这个结构体分配空间,并且初始化所有字段:
Queue *make_queue(int length) { Queue *queue = (Queue *) malloc(sizeof(Queue)); queue->length = length; queue->array = (int *) malloc(length * sizeof(int)); queue->next_in = 0; queue->next_out = 0; return queue; }
next_out的初始值需要一些解释。由于队列一开始为空,没有可移除的下一个元素,所以
next_out是无效的。
next_out==next_in是个特殊情况,它表示队列为空,所以我们可以编写:
int queue_empty(Queue *queue) { return (queue->next_in == queue->next_out); }
现在我们可以使用
queue_push向队列里面添加元素:
void queue_push(Queue *queue, int item) { if (queue_full(queue)) { perror_exit("queue is full"); } queue->array[queue->next_in] = item; queue->next_in = queue_incr(queue, queue->next_in); }
如果队列满了,
queue_push打印出错误信息并退出,我之后会解释
queue_full。
如果队列没有满,
queue_push插入新元素,之后使用
queue_incr增加
next_in:
int queue_incr(Queue *queue, int i) { return (i+1) % queue->length; }
当索引
i到达队列末尾时,它会转换为0。于是这样就很微妙了。如果我们持续向队列添加元素,最后
next_in会赶上
next_out。但是如果
next_in == next_out我们会错误地认为队列是空的。
为了避免这种情况,我们定义另一种特殊情况来表示队列是满的:
int queue_full(Queue *queue) { return (queue_incr(queue, queue->next_in) == queue->next_out); }
如果
next_in增加后与
next_out重合,那么我们如果添加新的元素,就会使队列看起来是空的。所以我们在“末尾”留出一个元素(要记住队列的末尾可能位于任何地方,不一定是数组末尾)。
现在我们可以编写
queue_pop,它移除并返回队列的下一个元素:
int queue_pop(Queue *queue) { if (queue_empty(queue)) { perror_exit("queue is empty"); } int item = queue->array[queue->next_out]; queue->next_out = queue_incr(queue, queue->next_out); return item; }
如果你尝试从空队列中弹出元素,
queue_pop会打印错误信息并退出。
10.2 生产者和消费者
现在让我们创建一些访问这个队列的线程。下面是生产者的代码:void *producer_entry(void *arg) { int i; Shared *shared = (Shared *) arg; for (i=0; i<QUEUE_LENGTH-1; i++) { printf("adding item %d\n", i); queue_push(shared->queue, i); } pthread_exit(NULL); }
下面是消费者的代码:
void *consumer_entry(void *arg) { int i; int item; Shared *shared = (Shared *) arg; for (i=0; i<QUEUE_LENGTH-1; i++) { item = queue_pop(shared->queue); printf("consuming item %d\n", item); } pthread_exit(NULL); }
下面是用于启动线程并等待它们的主线程代码:
int i; pthread_t child[NUM_CHILDREN]; Shared *shared = make_shared(); child[0] = make_thread(producer_entry, shared); child[1] = make_thread(consumer_entry, shared); for (i=0; i<NUM_CHILDREN; i++) { join_thread(child[i]); }
最后,下面是包含队列的共享结构:
typedef struct { Queue *queue; } Shared; Shared *make_shared() { Shared *shared = check_malloc(sizeof(Shared)); shared->queue = make_queue(QUEUE_LENGTH); return shared; }
到目前为止我们所写的代码是一个好的开始,但是有如下几种问题:
队列的访问不是线程安全的。不同的线程能同时访问
array、
next_in和
next_out,并且会使队列处于损坏的、“不一致”的状态。
如果消费者首先被调度,它会发现队列为空,打印错误信息并退出。我们应该阻塞住消费者,直到队列非空。与之相似,我们应该在队列满了的情况下阻塞住生产者。
在下一节中,我们会使用互斥体解决这一个问题。之后的章节中我们会使用条件变量解决第二个问题。
10.3 互斥体
我们可以使用互斥体使队列线程安全。这个版本的代码在queue_mutex.c中。
首先我们向队列结构中添加一个互斥体指针:
typedef struct { int *array; int length; int next_in; int next_out; Mutex *mutex; //-- this line is new } Queue;
之后在
make_queue中初始化互斥体:
Queue *make_queue(int length) { Queue *queue = (Queue *) malloc(sizeof(Queue)); queue->length = length; queue->array = (int *) malloc(length * sizeof(int)); queue->next_in = 0; queue->next_out = 0; queue->mutex = make_mutex(); //-- new return queue; }
接下来向
queue_push添加同步代码:
void queue_push(Queue *queue, int item) { mutex_lock(queue->mutex); //-- new if (queue_full(queue)) { mutex_unlock(queue->mutex); //-- new perror_exit("queue is full"); } queue->array[queue->next_in] = item; queue->next_in = queue_incr(queue, queue->next_in); mutex_unlock(queue->mutex); //-- new }
在检查队列是否已满之前,我们需要锁住互斥体。如果队列是满的,我们需要在退出之前解锁互斥体。否则线程应该保持互斥体锁住,使其它线程不能前进。
queue_pop的同步代码与之相似:
int queue_pop(Queue *queue) { mutex_lock(queue->mutex); if (queue_empty(queue)) { mutex_unlock(queue->mutex); perror_exit("queue is empty"); } int item = queue->array[queue->next_out]; queue->next_out = queue_incr(queue, queue->next_out); mutex_unlock(queue->mutex); return item; }
要注意其它队列函数,
queue_full、
queue_empty和
queue_incr都不需要锁住互斥体。任何调用这些函数的线程都需要首先锁住互斥体。这些要求是这些函数的接口文档的一部分。
使用这些额外的代码,队列就线程安全了。如果你运行它,你不会看到任何的同步错误。但是似乎消费者会在某个时间上退出,因为队列是空的。或者生产者会由于队列是满足而退出。
下一步就是添加条件变量。
10.4 条件变量
条件变量是条件相关的数据结构。它允许线程在某些条件变为真之前被阻塞。例如,thread_push可能希望检查队列是否已满,如果是这样,就在队列未满之前阻塞。所以我们感兴趣的“条件”就是“队列未满”。
与之相似,
thread_pop希望等待“队列非空”的条件。
下面是我们向代码添加这些功能的方式。首先我们向队列结构中添加两个条件变量:
typedef struct { int *array; int length; int next_in; int next_out; Mutex *mutex; Cond *nonempty; //-- new Cond *nonfull; //-- new } Queue;
之后在
make_queue中初始化它们:
Queue *make_queue(int length) { Queue *queue = (Queue *) malloc(sizeof(Queue)); queue->length = length; queue->array = (int *) malloc(length * sizeof(int)); queue->next_in = 0; queue->next_out = 0; queue->mutex = make_mutex(); queue->nonempty = make_cond(); //-- new queue->nonfull = make_cond(); //-- new return queue; }
现在在
queue_pop中,如果我们发现队列为空,我们不要退出,而是使用条件变量来阻塞:
int queue_pop(Queue *queue) { mutex_lock(queue->mutex); while (queue_empty(queue)) { cond_wait(queue->nonempty, queue->mutex); //-- new } int item = queue->array[queue->next_out]; queue->next_out = queue_incr(queue, queue->next_out); mutex_unlock(queue->mutex); cond_signal(queue->nonfull); //-- new return item; }
cond_wait有点复杂,所以让我们慢慢来。第一个参数是条件变量。这里我们需要等待的条件是“队列非空”。第二个变量是保护队列的互斥体。在你调用
cond_wait之前,你需要先锁住互斥体,否则它不会生效。
当锁住互斥体的线程调用
cond_wait时,它首先解锁互斥体,之后阻塞。这非常重要。如果
cond_wait不在阻塞之前解锁互斥体,其它线程就不能访问队列,不能添加任何物品,队列会永远为空。
所以当消费者阻塞在
nonempty的时候,生产者也可以运行。让我们来观察生产者运行
queue_push时会发生什么:
void queue_push(Queue *queue, int item) { mutex_lock(queue->mutex); while (queue_full(queue)) { cond_wait(queue->nonfull, queue->mutex); //-- new } queue->array[queue->next_in] = item; queue->next_in = queue_incr(queue, queue->next_in); mutex_unlock(queue->mutex); cond_signal(queue->nonempty); //-- new }
让我们假设队列现在未满,于是生产者并不会调用
cond_wait也不会阻塞。它会向队列添加新的元素并解锁互斥体。但是在退出之前,它做了额外的一件事:它向
nonempty条件变量发送信号。
向条件变量发送更新好表示条件为真,或者至少它可能为真。如果没有任何线程在等待条件变量,信号就不起作用。
如果有线程在等待条件变量,它们全部会从
cond_wait解除阻塞并且恢复执行。但是在被唤醒的进程从
cond_wait返回之前,它需要等待并再次锁住互斥体。
现在我们回到
queue_pop来观察当线程从
cond_wait返回时会发生什么。它会循环到
while语句的开头,并再次检查条件。我会在之后解释其原因,但是现在让我们假设条件为真,也就是说队列非空。
当线程从
while循环退出之后,我们知道了两件事情:(1)条件为真,所以队列中至少有一个物品,(2)互斥体是锁住的,所以访问队列是安全的。
在移除物品之后,
queue_pop解锁了互斥体,发送了队列未满的信号,之后退出。
在下一节我会向你展示我的
Cond的工作原因,但是首先我想回答两个常见问题:
为什么
cond_wait在
while循环中,而不是
if语句中?也就是说,为什么在从
cond_wait返回之后要再次检查条件?
需要再次检查条件的首要原因就是信号拦截的可能性。假设线程A在等待`nonempty`,线程B向队列添加元素,之后向`nonempty`发送信号。线程A被唤醒并且尝试锁住互斥体,但是在轮到它之前,邪恶的线程C插进来了,锁住了互斥体,从队列中弹出物品并且解锁了互斥体。现在队列再次为空,但是线程A没有被阻塞。线程A会锁住互斥体并且从`cond_wait`返回。如果线程A不再次检查条件,它会尝试从空队列中弹出元素,可能会产生错误。 > 译者注:有些条件变量的实现可以每次只唤醒一个线程,比如Java对象的`notify`方法。这种情况就可以使用`if`。
当人们了解条件变量时,另一个问题是“条件变量怎么知道它关联了哪个条件?”
这一问题可以理解,因为在`Cond`结构和有关条件之间没有明显的关联。在它的使用方式中,关联是隐性的。 下面是一种理解它的办法:当你调用`cond_wait`时,`Cond`所关联的条件为假;当你调用`cond_signal`时它为真。当然,可能有一些条件第一种情况下为真,第二种情况下为假。正确的情况只在程序员的脑子中,所以它应该在文档中有详细的解释。
10.5 条件变量的实现
我在上一节中使用的条件变量是pthread_cond_t类型的包装,它定义在POSIX线程API中。这非常类似于
Mutex,它是
pthread_mutex_t的包装。两个包装都定义在
utils.c和
utils.h中。
下面是类型定义:
typedef pthread_cond_t Cond;
make_cond分配空间,初始化条件变量,之后返回指针:
Cond *make_cond() { Cond *cond = check_malloc(sizeof(Cond)); int n = pthread_cond_init(cond, NULL); if (n != 0) perror_exit("make_cond failed"); return cond; }
下面是
cond_wait和
cond_signal的包装:
全选<button href="javascript:void(0);" _xhe_href="javascript:void(0);" class="copyCode btn btn-xs" data-clipboard-text="" void="" cond_wait(cond="" *cond,="" mutex="" *mutex)"="" data-toggle="tooltip" data-placement="bottom" title="" style="color: rgb(255, 255, 255); font-style: inherit; font-variant: inherit; font-stretch: inherit; font-size: 12px; line-height: 1.5; font-family: inherit; margin: 0px 0px 0px 5px; overflow: visible; cursor: pointer; vertical-align: middle; border: 1px solid transparent; white-space: nowrap; padding-right: 5px; padding-left: 5px; border-radius: 3px; -webkit-user-select: none; box-shadow: rgba(0, 0, 0, 0.0980392) 0px 1px 2px; background-image: none; background-color: rgba(0, 0, 0, 0.74902);">复制放进笔记
void cond_wait(Cond *cond, Mutex *mutex) { int n = pthread_cond_wait(cond, mutex); if (n != 0) perror_exit("cond_wait failed"); } void cond_signal(Cond *cond) { int n = pthread_cond_signal(cond); if (n != 0) perror_exit("cond_signal failed"); }
到这里就应该没有什么意外的东西了。
相关文章推荐
- tomcat 熟知与运用
- Git配置 多账户SSH KEY
- Java基于UDP协议实现简单的聊天室程序
- 性能之procrank命令
- maven 私服安装文档
- 完全理解Android中的RemoteViews
- Mybatis深入了解(四)----输入输出映射
- 自我实现字符串函数
- opengl视图变化 非常清楚的一例子
- 奶牛排队_纪中1746_模拟
- PHP之10个整数,从小到大排序,输出排序结果
- C# 匿名方法
- 切勿用普通for循环遍历LinkedList
- Codeforces Round #358 (Div. 2) C. Alyona and the Tree (二叉树+DFS)
- LintCode:栅栏染色
- PL/SQL Developer 如何显示行号
- Java NIO Selector
- java命令行编译引入jar包的java源文件
- Codeforces Round #362 (Div. 2) C Lorenzo Von Matterhorn
- PHP之判断是不是素数