您的位置:首页 > 理论基础 > 数据结构算法

POSIX 线程详解(3)

2007-11-19 10:08 309 查看
POSIX 线程详解(3)
                                      

 

[align=left]使用条件变量提高效率[/align]
 

[align=left]Daniel Robbins
总裁兼 CEO,Gentoo Technologies, Inc.
2000 年 9 月[/align]
 

[align=left]本文是 POSIX 线程三部曲系列的最后一部分,Daniel 将详细讨论如何使用条件变量。条件变量是 POSIX 线程结构,可以让您在遇到某些条件时“唤醒”线程。可以将它们看作是一种线程安全的信号发送。Daniel 使用目前您所学到的知识实现了一个多线程工作组应用程序,本文将围绕着这一示例而进行讨论。[/align]
[align=left]条件变量详解
上一篇文章结束时,我描述了一个比较特殊的难题:如果线程正在等待某个特定条件发生,它应该如何处理这种情况?它可以重复对互斥对象锁定和解锁,每次都会检查共享数据结构,以查找某个值。但这是在浪费时间和资源,而且这种繁忙查询的效率非常低。解决这个问题的最佳方法是使用 pthread_cond_wait() 调用来等待特殊条件发生。[/align]
 

[align=left]了解 pthread_cond_wait() 的作用非常重要 -- 它是 POSIX 线程信号发送系统的核心,也是最难以理解的部分。[/align]
 

[align=left]首先,让我们考虑以下情况:线程为查看已链接列表而锁定了互斥对象,然而该列表恰巧是空的。这一特定线程什么也干不了 -- 其设计意图是从列表中除去节点,但是现在却没有节点。因此,它只能:[/align]
 

[align=left]锁定互斥对象时,线程将调用 pthread_cond_wait(&mycond,&mymutex)。pthread_cond_wait() 调用相当复杂,因此我们每次只执行它的一个操作。[/align]
 

[align=left]pthread_cond_wait() 所做的第一件事就是同时对互斥对象解锁(于是其它线程可以修改已链接列表),并等待条件 mycond 发生(这样当 pthread_cond_wait() 接收到另一个线程的“信号”时,它将苏醒)。现在互斥对象已被解锁,其它线程可以访问和修改已链接列表,可能还会添加项。[/align]
 

[align=left]此时,pthread_cond_wait() 调用还未返回。对互斥对象解锁会立即发生,但等待条件 mycond 通常是一个阻塞操作,这意味着线程将睡眠,在它苏醒之前不会消耗 CPU 周期。这正是我们期待发生的情况。线程将一直睡眠,直到特定条件发生,在这期间不会发生任何浪费 CPU 时间的繁忙查询。从线程的角度来看,它只是在等待 pthread_cond_wait() 调用返回。[/align]
 

[align=left]现在继续说明,假设另一个线程(称作“2 号线程”)锁定了 mymutex 并对已链接列表添加了一项。在对互斥对象解锁之后,2 号线程会立即调用函数 pthread_cond_broadcast(&mycond)。此操作之后,2 号线程将使所有等待 mycond 条件变量的线程立即苏醒。这意味着第一个线程(仍处于 pthread_cond_wait() 调用中)现在将苏醒。[/align]
 

[align=left]现在,看一下第一个线程发生了什么。您可能会认为在 2 号线程调用 pthread_cond_broadcast(&mymutex) 之后,1 号线程的 pthread_cond_wait() 会立即返回。不是那样!实际上,pthread_cond_wait() 将执行最后一个操作:重新锁定 mymutex。一旦 pthread_cond_wait() 锁定了互斥对象,那么它将返回并允许 1 号线程继续执行。那时,它可以马上检查列表,查看它所感兴趣的更改。[/align]
 

[align=left]停止并回顾!
那个过程非常复杂,因此让我们先来回顾一下。第一个线程首先调用:[/align]
 

[align=left]     pthread_mutex_lock(&mymutex);[/align]
[align=left]然后,它检查了列表。没有找到感兴趣的东西,于是它调用:[/align]
 

[align=left]    pthread_cond_wait(&mycond, &mymutex);[/align]
 
[align=left]然后,pthread_cond_wait() 调用在返回前执行许多操作:[/align]
 

[align=left]   pthread_mutex_unlock(&mymutex);[/align]
 
[align=left]它对 mymutex 解锁,然后进入睡眠状态,等待 mycond 以接收 POSIX 线程“信号”。一旦接收到“信号”(加引号是因为我们并不是在讨论传统的 UNIX 信号,而是来自 pthread_cond_signal() 或 pthread_cond_broadcast() 调用的信号),它就会苏醒。但 pthread_cond_wait() 没有立即返回 -- 它还要做一件事:重新锁定 mutex:[/align]
 

[align=left]    pthread_mutex_lock(&mymutex);[/align]
 
[align=left]pthread_cond_wait() 知道我们在查找 mymutex “背后”的变化,因此它继续操作,为我们锁定互斥对象,然后才返回。[/align]
[align=left]pthread_cond_wait() 小测验
现在已回顾了 pthread_cond_wait() 调用,您应该了解了它的工作方式。应该能够叙述 pthread_cond_wait() 依次执行的所有操作。尝试一下。如果理解了 pthread_cond_wait(),其余部分就相当容易,因此请重新阅读以上部分,直到记住为止。好,读完之后,能否告诉我在调用 pthread_cond_wait() 之前,互斥对象必须处于什么状态?pthread_cond_wait() 调用返回之后,互斥对象处于什么状态?这两个问题的答案都是“锁定”。既然已经完全理解了 pthread_cond_wait() 调用,现在来继续研究更简单的东西 -- 初始化和真正的发送信号和广播进程。到那时,我们将会对包含了多线程工作队列的 C 代码了如指掌。
[/align]
[align=left]初始化和清除
条件变量是一个需要初始化的真实数据结构。以下就初始化的方法。首先,定义或分配一个条件变量,如下所示:[/align]
 

[align=left]    pthread_cond_t mycond;[/align]
 
[align=left]然后,调用以下函数进行初始化:[/align]
 

[align=left]    pthread_cond_init(&mycond,NULL);[/align]
 
[align=left]瞧,初始化完成了!在释放或废弃条件变量之前,需要毁坏它,如下所示:[/align]
 

[align=left]    pthread_cond_destroy(&mycond);[/align]
 
[align=left]很简单吧。接着讨论 pthread_cond_wait() 调用。[/align]
 

[align=left]等待
一旦初始化了互斥对象和条件变量,就可以等待某个条件,如下所示:[/align]
 

[align=left]    pthread_cond_wait(&mycond, &mymutex);[/align]
 
[align=left]请注意,代码在逻辑上应该包含 mycond 和 mymutex。一个特定条件只能有一个互斥对象,而且条件变量应该表示互斥数据“内部”的一种特殊的条件更改。一个互斥对象可以用许多条件变量(例如,cond_empty、cond_full、cond_cleanup),但每个条件变量只能有一个互斥对象。[/align]
 

[align=left]发送信号和广播
对于发送信号和广播,需要注意一点。如果线程更改某些共享数据,而且它想要唤醒所有正在等待的线程,则应使用 pthread_cond_broadcast 调用,如下所示:[/align]
 

[align=left]    pthread_cond_broadcast(&mycond);[/align]
 
[align=left]在某些情况下,活动线程只需要唤醒第一个正在睡眠的线程。假设您只对队列添加了一个工作作业。那么只需要唤醒一个工作程序线程(再唤醒其它线程是不礼貌的!):[/align]
 

[align=left]    pthread_cond_signal(&mycond);[/align]
 
[align=left]此函数只唤醒一个线程。如果 POSIX 线程标准允许指定一个整数,可以让您唤醒一定数量的正在睡眠的线程,那就更完美了。但是很可惜,我没有被邀请参加会议。[/align]
 

[align=left]工作组
我将演示如何创建多线程工作组。在这个方案中,我们创建了许多工作程序线程。每个线程都会检查 wq(“工作队列”),查看是否有需要完成的工作。如果有需要完成的工作,那么线程将从队列中除去一个节点,执行这些特定工作,然后等待新的工作到达。[/align]
 

[align=left]与此同时,主线程负责创建这些工作程序线程、将工作添加到队列,然后在它退出时收集所有工作程序线程。您将会遇到许多 C 代码,好好准备吧![/align]
[align=left]队列
需要队列是出于两个原因。首先,需要队列来保存工作作业。还需要可用于跟踪已终止线程的数据结构。还记得前几篇文章(请参阅本文结尾处的参考资料)中,我曾提到过需要使用带有特定进程标识的 pthread_join 吗?使用“清除队列”(称作 "cq")可以解决无法等待任何已终止线程的问题(稍后将详细讨论这个问题)。以下是标准队列代码。将此代码保存到文件 queue.h 和 queue.c:[/align]
 

[align=left]queue.h[/align]
 

[align=left]/* queue.h[/align]
 
 

[align=left]** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.[/align]
 

[align=left]** Author: Daniel Robbins[/align]
 

[align=left]** Date: 16 Jun 2000[/align]
 

[align=left]*/[/align]
 

 

[align=left]typedef struct node {[/align]
 

[align=left] struct node *next;[/align]
 

[align=left]} node;[/align]
 

 

[align=left]typedef struct queue {[/align]
 

[align=left] node *head, *tail; [/align]
 

[align=left]} queue;[/align]
 

 

[align=left]void queue_init(queue *myroot);[/align]
 

[align=left]void queue_put(queue *myroot, node *mynode);[/align]
 

[align=left]node *queue_get(queue *myroot);[/align]
 

 
 

 
 

 
[align=left]queue.c[/align]
 

 

[align=left]/* queue.c[/align]
 

[align=left]** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.[/align]
 

[align=left]** Author: Daniel Robbins[/align]
 

[align=left]** Date: 16 Jun 2000[/align]
 

[align=left]**[/align]
 

[align=left]** This set of queue functions was originally thread-aware. I[/align]
 

[align=left]** redesigned the code to make this set of queue routines[/align]
 

[align=left]** thread-ignorant (just a generic, boring yet very fast set of queue[/align]
 

[align=left]** routines). Why the change? Because it makes more sense to have[/align]
 

[align=left]** the thread support as an optional add-on. Consider a situation[/align]
 

[align=left]** where you want to add 5 nodes to the queue. With the[/align]
 

[align=left]** thread-enabled version, each call to queue_put() would[/align]
 

[align=left]** automatically lock and unlock the queue mutex 5 times -- that's a[/align]
 

[align=left]** lot of unnecessary overhead. However, by moving the thread stuff[/align]
 

[align=left]** out of the queue routines, the caller can lock the mutex once at[/align]
 

[align=left]** the beginning, then insert 5 items, and then unlock at the end.[/align]
 

[align=left]** Moving the lock/unlock code out of the queue functions allows for[/align]
 

[align=left]** optimizations that aren't possible otherwise. It also makes this[/align]
 

[align=left]** code useful for non-threaded applications.[/align]
 

[align=left]**[/align]
 

[align=left]** We can easily thread-enable this data structure by using the[/align]
 

[align=left]** data_control type defined in control.c and control.h. */[/align]
 

 

[align=left]#include [/align]
 

[align=left]#include "queue.h"[/align]
 

 

[align=left]void queue_init(queue *myroot) {[/align]
 

[align=left] myroot->head=NULL;[/align]
 

[align=left] myroot->tail=NULL;[/align]
 

[align=left]}[/align]
 

 

[align=left]void queue_put(queue *myroot,node *mynode) {[/align]
 

[align=left] mynode->next=NULL;[/align]
 

[align=left] if (myroot->tail!=NULL)[/align]
 

[align=left]    myroot->tail->next=mynode;[/align]
 

[align=left] myroot->tail=mynode;[/align]
 

[align=left] if (myroot->head==NULL)[/align]
 

[align=left]    myroot->head=mynode;[/align]
 

[align=left]}[/align]
 

 

[align=left]node *queue_get(queue *myroot) {[/align]
 

[align=left] //get from root[/align]
 

[align=left] node *mynode;[/align]
 

[align=left] mynode=myroot->head;[/align]
 

[align=left] if (myroot->head!=NULL)[/align]
 

[align=left]    myroot->head=myroot->head->next;[/align]
 

[align=left] return mynode;[/align]
 

[align=left]} [/align]
[align=left]data_control 代码
我编写的并不是线程安全的队列例程,事实上我创建了一个“数据包装”或“控制”结构,它可以是任何线程支持的数据结构。看一下 control.h:[/align]
[align=left]control.h[/align]
 

[align=left]#include [/align]
 
 

 

[align=left]typedef struct data_control {[/align]
 

[align=left] pthread_mutex_t mutex;[/align]
 

[align=left] pthread_cond_t cond;[/align]
 

[align=left] int active;[/align]
 

[align=left]} data_control;[/align]
[align=left]现在您看到了 data_control 结构定义,以下是它的视觉表示:[/align]
 

[align=left]所使用的 data_control 结构
[/align]
 



[align=left]
图像中的锁代表互斥对象,它允许对数据结构进行互斥访问。黄色的星代表条件变量,它可以睡眠,直到所讨论的数据结构改变为止。on/off 开关表示整数 "active",它告诉线程此数据是否是活动的。在代码中,我使用整数 active 作为标志,告诉工作队列何时应该关闭。以下是 control.c:[/align]
 

[align=left]control.c[/align]
 

[align=left]/* control.c[/align]
 
 

[align=left]** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.[/align]
 

[align=left]** Author: Daniel Robbins[/align]
 

[align=left]** Date: 16 Jun 2000[/align]
 

[align=left]**[/align]
 

[align=left]** These routines provide an easy way to make any type of[/align]
 

[align=left]** data-structure thread-aware. Simply associate a data_control[/align]
 

[align=left]** structure with the data structure (by creating a new struct, for[/align]
 

[align=left]** example). Then, simply lock and unlock the mutex, or[/align]
 

[align=left]** wait/signal/broadcast on the condition variable in the data_control[/align]
 

[align=left]** structure as needed.[/align]
 

[align=left]**[/align]
 

[align=left]** data_control structs contain an int called "active". This int is[/align]
 

[align=left]** intended to be used for a specific kind of multithreaded design,[/align]
 

[align=left]** where each thread checks the state of "active" every time it locks[/align]
 

[align=left]** the mutex. If active is 0, the thread knows that instead of doing[/align]
 

[align=left]** its normal routine, it should stop itself. If active is 1, it[/align]
 

[align=left]** should continue as normal. So, by setting active to 0, a[/align]
 

[align=left]** controlling thread can easily inform a thread work crew to shut[/align]
 

[align=left]** down instead of processing new jobs. Use the control_activate()[/align]
 

[align=left]** and control_deactivate() functions, which will also broadcast on[/align]
 

[align=left]** the data_control struct's condition variable, so that all threads[/align]
 

[align=left]** stuck in pthread_cond_wait() will wake up, have an opportunity to[/align]
 

[align=left]** notice the change, and then terminate.[/align]
 

[align=left]*/[/align]
 

 

[align=left]#include "control.h"[/align]
 

 

[align=left]int control_init(data_control *mycontrol) {[/align]
 

[align=left] int mystatus;[/align]
 

[align=left] if (pthread_mutex_init(&(mycontrol->mutex),NULL))[/align]
 

[align=left]    return 1;[/align]
 

[align=left] if (pthread_cond_init(&(mycontrol->cond),NULL))[/align]
 

[align=left]    return 1;[/align]
 

[align=left] mycontrol->active=0;[/align]
 

[align=left] return 0;[/align]
 

[align=left]}[/align]
 

 

[align=left]int control_destroy(data_control *mycontrol) {[/align]
 

[align=left] int mystatus;[/align]
 

[align=left]  if (pthread_cond_destroy(&(mycontrol->cond)))[/align]
 

[align=left]    return 1;[/align]
 

[align=left] mycontrol->active=0;[/align]
 

[align=left] return 0;[/align]
 

[align=left]}[/align]
 

[align=left]int control_activate(data_control *mycontrol) {[/align]
 

[align=left] int mystatus;[/align]
 

[align=left] if (pthread_mutex_lock(&(mycontrol->mutex)))[/align]
 

[align=left]    return 0;[/align]
 

[align=left] mycontrol->active=1;[/align]
 

[align=left] pthread_mutex_unlock(&(mycontrol->mutex));[/align]
 

[align=left] pthread_cond_broadcast(&(mycontrol->cond));[/align]
 

[align=left] return 1;[/align]
 

[align=left]}[/align]
 

 

[align=left]int control_deactivate(data_control *mycontrol) {[/align]
 

[align=left] int mystatus;[/align]
 

[align=left] if (pthread_mutex_lock(&(mycontrol->mutex)))[/align]
 

[align=left]    return 0;[/align]
 

[align=left] mycontrol->active=0;[/align]
 

[align=left] pthread_mutex_unlock(&(mycontrol->mutex));[/align]
 

[align=left] pthread_cond_broadcast(&(mycontrol->cond));[/align]
 

[align=left] return 1;[/align]
 

[align=left]} [/align]
[align=left]调试时间
在开始调试之前,还需要一个文件。以下是 dbug.h:[/align]
 

[align=left]dbug.h[/align]
 

[align=left]#define dabort() /[/align]
  { printf("Aborting at line %d in source file %s/n",__LINE__,__FILE__); abort(); }
[align=left]此代码用于处理工作组代码中的不可纠正错误。[/align]
 

[align=left]工作组代码
说到工作组代码,以下就是:[/align]
[align=left]workcrew.c[/align]
 

[align=left]#include <stdio.h>[/align]
 

 

[align=left]#include <stdlib.h>[/align]
 

[align=left]#include "control.h"[/align]
 

[align=left]#include "queue.h"[/align]
 

[align=left]#include "dbug.h"[/align]
 

[align=left]/* the work_queue holds tasks for the various threads to complete. */[/align]
struct work_queue {
  data_control control;
 

[align=left] queue work;[/align]
 

[align=left]} wq;[/align]
 

[align=left] [/align]
 

[align=left]/* I added a job number to the work node. Normally, the work node[/align]
 

[align=left]   would contain additional data that needed to be processed. */[/align]
 

[align=left]typedef struct work_node {[/align]
 

 

[align=left] struct node *next;[/align]
 

[align=left] int jobnum;[/align]
 

[align=left]} wnode;[/align]
 

 

[align=left]/* the cleanup queue holds stopped threads. Before a thread[/align]
 

[align=left]   terminates, it adds itself to this list. Since the main thread is[/align]
 

[align=left]   waiting for changes in this list, it will then wake up and clean up[/align]
 

[align=left]   the newly terminated thread. */[/align]
 

 

[align=left]struct cleanup_queue {[/align]
 

[align=left] data_control control;[/align]
 

[align=left] queue cleanup;[/align]
 

[align=left]} cq;[/align]
 

 

[align=left]/* I added a thread number (for debugging/instructional purposes) and[/align]
 

[align=left]  a thread id to the cleanup node. The cleanup node gets passed to[/align]
 

[align=left]   the new thread on startup, and just before the thread stops, it[/align]
 

[align=left]   attaches the cleanup node to the cleanup queue. The main thread[/align]
 

[align=left]   monitors the cleanup queue and is the one that performs the[/align]
 

[align=left]   necessary cleanup. */[/align]
 

 

[align=left]typedef struct cleanup_node {[/align]
 

[align=left] struct node *next;[/align]
 

[align=left] int threadnum;[/align]
 

[align=left] pthread_t tid;[/align]
 

[align=left]} cnode;[/align]
 

 

[align=left]void *threadfunc(void *myarg) {[/align]
 

 

[align=left] wnode *mywork;[/align]
 

[align=left] cnode *mynode;[/align]
 

 

[align=left] mynode=(cnode *) myarg;[/align]
 

 

[align=left] pthread_mutex_lock(&wq.control.mutex);[/align]
 

 

[align=left] while (wq.control.active) {[/align]
 

[align=left]    while (wq.work.head==NULL && wq.control.active) {[/align]
 

[align=left]      pthread_cond_wait(&wq.control.cond, &wq.control.mutex);[/align]
 

[align=left]    }[/align]
 

[align=left]    if (!wq.control.active) [/align]
 

[align=left]      break;[/align]
 

[align=left]    //we got something![/align]
 

[align=left]    mywork=(wnode *) queue_get(&wq.work);[/align]
 

[align=left]    pthread_mutex_unlock(&wq.control.mutex);[/align]
 

[align=left]    //perform processing...[/align]
 

[align=left]    printf("Thread number %d processing job %d/n",mynode->threadnum,mywork->jobnum);[/align]
 

[align=left]    free(mywork);[/align]
 

[align=left]    pthread_mutex_lock(&wq.control.mutex);[/align]
 

[align=left] }[/align]
 

 

[align=left] pthread_mutex_unlock(&wq.control.mutex);[/align]
 

 

[align=left] pthread_mutex_lock(&cq.control.mutex);[/align]
 

[align=left] queue_put(&cq.cleanup,(node *) mynode);[/align]
 

[align=left] pthread_mutex_unlock(&cq.control.mutex);[/align]
 

[align=left] pthread_cond_signal(&cq.control.cond);[/align]
 

[align=left] printf("thread %d shutting down.../n",mynode->threadnum);[/align]
 

[align=left]  return NULL;[/align]
 

[align=left]}[/align]
 

 

[align=left]#define NUM_WORKERS 4[/align]
 

 

[align=left]int numthreads;[/align]
 

 

[align=left]void join_threads(void) {[/align]
 

[align=left] cnode *curnode;[/align]
 

 

[align=left] printf("joining threads.../n");[/align]
 

 

[align=left] while (numthreads) {[/align]
 

[align=left]    pthread_mutex_lock(&cq.control.mutex);[/align]
 

 

[align=left]    /* below, we sleep until there really is a new cleanup node. This[/align]
 

[align=left]       takes care of any false wakeups... even if we break out of[/align]
 

[align=left]       pthread_cond_wait(), we don't make any assumptions that the[/align]
 

[align=left]       condition we were waiting for is true. */[/align]
 

 

[align=left]    while (cq.cleanup.head==NULL) {[/align]
 

[align=left]      pthread_cond_wait(&cq.control.cond,&cq.control.mutex);[/align]
 

[align=left]    }[/align]
 

 

[align=left]    /* at this point, we hold the mutex and there is an item in the[/align]
 

[align=left]       list that we need to process. First, we remove the node from[/align]
 

[align=left]       the queue. Then, we call pthread_join() on the tid stored in[/align]
 

[align=left]       the node. When pthread_join() returns, we have cleaned up[/align]
 

[align=left]       after a thread. Only then do we free() the node, decrement the[/align]
 

[align=left]       number of additional threads we need to wait for and repeat the[/align]
 

[align=left]       entire process, if necessary */[/align]
 

 

[align=left]      curnode = (cnode *) queue_get(&cq.cleanup);[/align]
 

[align=left]      pthread_mutex_unlock(&cq.control.mutex);[/align]
 

[align=left]      pthread_join(curnode->tid,NULL);[/align]
 

[align=left]      printf("joined with thread %d/n",curnode->threadnum);[/align]
 

[align=left]      free(curnode);[/align]
 

[align=left]      numthreads--;[/align]
 

[align=left] }[/align]
 

[align=left]}[/align]
 

 

 

[align=left]int create_threads(void) {[/align]
 

[align=left] int x;[/align]
 

[align=left] cnode *curnode;[/align]
 

 

[align=left] for (x=0; x[/align]
[align=left]    curnode=malloc(sizeof(cnode));[/align]
 

[align=left]    if (!curnode)[/align]
 

[align=left]      return 1;[/align]
 

[align=left]    curnode->threadnum=x;[/align]
 

[align=left]    if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode))[/align]
 

[align=left]      return 1;[/align]
 

[align=left]    printf("created thread %d/n",x);[/align]
 

[align=left]    numthreads++;[/align]
 

[align=left] }[/align]
 

[align=left] return 0;[/align]
 

[align=left]}[/align]
 

 

[align=left]void initialize_structs(void) {[/align]
 

[align=left] numthreads=0;[/align]
 

[align=left] if (control_init(&wq.control))[/align]
 

[align=left]    dabort();[/align]
 

[align=left] queue_init(&wq.work);[/align]
 

[align=left] if (control_init(&cq.control)) {[/align]
 

[align=left]    control_destroy(&wq.control);[/align]
 

[align=left]    dabort();[/align]
 

[align=left] }[/align]
 

[align=left] queue_init(&wq.work);[/align]
 

[align=left] control_activate(&wq.control);[/align]
 

[align=left]}[/align]
 

 

[align=left]void cleanup_structs(void) {[/align]
 

[align=left] control_destroy(&cq.control);[/align]
 

[align=left] control_destroy(&wq.control);[/align]
 

[align=left]}[/align]
 

[align=left]int main(void) {[/align]
 

 

[align=left] int x;[/align]
 

 

[align=left] wnode *mywork;[/align]
 

[align=left] initialize_structs();[/align]
 

 

[align=left] /* CREATION */[/align]
 

 

 

[align=left] if (create_threads()) {[/align]
 

[align=left]    printf("Error starting threads... cleaning up./n");[/align]
 

[align=left]    join_threads();[/align]
 

[align=left]    dabort();[/align]
 

[align=left] }[/align]
 

 

[align=left] pthread_mutex_lock(&wq.control.mutex);[/align]
 

[align=left] for (x=0; x<16000; x++) {[/align]
 

[align=left]    mywork=malloc(sizeof(wnode));[/align]
 

[align=left]    if (!mywork) {[/align]
 

[align=left]      printf("ouch! can't malloc!/n");[/align]
 

[align=left]      break;[/align]
 

[align=left]    }[/align]
 

[align=left]    mywork->jobnum=x;[/align]
 

[align=left]    queue_put(&wq.work,(node *) mywork);[/align]
 

[align=left] }[/align]
 

[align=left] pthread_mutex_unlock(&wq.control.mutex);[/align]
 

[align=left] pthread_cond_broadcast(&wq.control.cond);[/align]
 

 

[align=left] printf("sleeping.../n");[/align]
 

[align=left] sleep(2);[/align]
 

[align=left] printf("deactivating work queue.../n");[/align]
 

[align=left] control_deactivate(&wq.control);[/align]
 

[align=left] /* CLEANUP */[/align]
 

 

[align=left] join_threads();[/align]
 

[align=left] cleanup_structs();[/align]
 

[align=left]}[/align]
[align=left]代码初排
现在来快速初排代码。定义的第一个结构称作 "wq",它包含了 data_control 和队列头。data_control 结构用于仲裁对整个队列的访问,包括队列中的节点。下一步工作是定义实际的工作节点。要使代码符合本文中的示例,此处所包含的都是作业号。[/align]
 

[align=left]接着,创建清除队列。注释说明了它的工作方式。好,现在让我们跳过 threadfunc()、join_threads()、create_threads() 和 initialize_structs() 调用,直接跳到 main()。所做的第一件事就是初始化结构 -- 这包括初始化 data_controls 和队列,以及激活工作队列。[/align]
[align=left]有关清除的注意事项
现在初始化线程。如果看一下 create_threads() 调用,似乎一切正常 -- 除了一件事。请注意,我们正在分配清除节点,以及初始化它的线程号和 TID 组件。我们还将清除节点作为初始自变量传递给每一个新的工作程序线程。为什么这样做?[/align]
 

[align=left]因为当某个工作程序线程退出时,它会将其清除节点连接到清除队列,然后终止。那时,主线程会在清除队列中检测到这个节点(利用条件变量),并将这个节点移出队列。因为 TID(线程标识)存储在清除节点中,所以主线程可以确切知道哪个线程已终止了。然后,主线程将调用 pthread_join(tid),并联接适当的工作程序线程。如果没有做记录,那么主线程就需要按任意顺序联接工作程序线程,可能是按它们的创建顺序。由于线程不一定按此顺序终止,那么主线程可能会在已经联接了十个线程时,等待联接另一个线程。您能理解这种设计决策是如何使关闭代码加速的吗(尤其在使用几百个工作程序线程的情况下)?[/align]
[align=left]创建工作
我们已启动了工作程序线程(它们已经完成了执行 threadfunc(),稍后将讨论此函数),现在主线程开始将工作节点插入工作队列。首先,它锁定 wq 的控制互斥对象,然后分配 16000 个工作包,将它们逐个插入队列。完成之后,将调用 pthread_cond_broadcast(),于是所有正在睡眠的线程会被唤醒,并开始执行工作。此时,主线程将睡眠两秒钟,然后释放工作队列,并通知工作程序线程终止活动。接着,主线程会调用 join_threads() 函数来清除所有工作程序线程。[/align]
[align=left]threadfunc()
现在来讨论 threadfunc(),这是所有工作程序线程都要执行的代码。当工作程序线程启动时,它会立即锁定工作队列互斥对象,获取一个工作节点(如果有的话),然后对它进行处理。如果没有工作,则调用 pthread_cond_wait()。您会注意到这个调用在一个非常紧凑的 while() 循环中,这是非常重要的。当从 pthread_cond_wait() 调用中苏醒时,决不能认为条件肯定发生了 -- 它可能发生了,也可能没有发生。如果发生了这种情况,即错误地唤醒了线程,而列表是空的,那么 while 循环将再次调用 pthread_cond_wait()。[/align]
 

[align=left]如果有一个工作节点,那么我们只打印它的作业号,释放它并退出。然而,实际代码会执行一些更实质性的操作。在 while() 循环结尾,我们锁定了互斥对象,以便检查 active 变量,以及在循环顶部检查新的工作节点。如果执行完此代码,就会发现如果 wq.control.active 是 0,while 循环就会终止,并会执行 threadfunc() 结尾处的清除代码。[/align]
 

[align=left]工作程序线程的清除代码部件非常有趣。首先,由于 pthread_cond_wait() 返回了锁定的互斥对象,它会对 work_queue 解锁。然后,它锁定清除队列,添加清除代码(包含了 TID,主线程将使用此 TID 来调用 pthread_join()),然后再对清除队列解锁。此后,它发信号给所有 cq 等待者 (pthread_cond_signal(&cq.control.cond)),于是主线程就知道有一个待处理的新节点。我们不使用 pthread_cond_broadcast(),因为没有这个必要 -- 只有一个线程(主线程)在等待清除队列中的新节点。当它调用 join_threads() 时,工作程序线程将打印关闭消息,然后终止,等待主线程发出的 pthread_join() 调用。[/align]
[align=left]join_threads()
如果要查看关于如何使用条件变量的简单示例,请参考 join_threads() 函数。如果还有工作程序线程,join_threads() 会一直执行,等待清除队列中新的清除节点。如果有新节点,我们会将此节点移出队列、对清除队列解锁(从而使工作程序可以添加清除节点)、联接新的工作程序线程(使用存储在清除节点中的 TID)、释放清除节点、减少“现有”线程的数量,然后继续。[/align]
[align=left]结束语
现在已经到了“POSIX 线程详解”系列的尾声,希望您已经准备好开始将多线程代码添加到您自己的应用程序中。有关详细信息,请参阅参考资料部分,这部分内容还包含了本文中使用的所有源码的 tar 文件。下一个系列中再见![/align]
 

[align=left]参考资料[/align]
 

本文中使用的源码的 tar 文件

友好的 Linux pthread 在线帮助 ("man -k pthread") 是极好的参考资料。

如果要彻底了解 POSIX 线程,我推荐此书:Programming with POSIX Threads,David R. Butenhof (Addison-Wesley, 1997)。据证实,此书是现有最好的讨论 POSIX 线程的书籍。

W. Richard Stevens 撰写的 UNIX Network Programming - Networking APIs: Sockets and XTI,(Prentice Hall, 1997) 一书还涵盖了 POSIX 线程。这是一本经典著作,但它讨论线程不如上述的 Programming with POSIX Threads 那样详细。

请参考 Daniel 在 developerWorks 上发表的 POSIX 线程系列中的前几篇文章:
 

POSIX 线程详解介绍了 POSIX 线程,并演示了如何在代码中使用线程。

POSIX 线程详解,第 2 部分演示了如何使用被称为互斥对象的灵巧小玩意,来保护线程代码中共享数据结构的完整性。

请参阅 Sean Walton 撰写的有关 Linux 线程的文档,KB7rfa

请学习亚里桑那大学的 Mark Hays 编写的 POSIX 线程教程

请在 Pthreads-Tcl 介绍中查看对 Tcl 的更改,此更改使 Tcl 能够与 POSIX 线程一起使用。

请学习另一个教程 POSIX 线程入门, 此教程由位于阿姆赫斯特镇的马萨诸塞大学计算机科学系的 Tom Wagner 和 Don Towsley 编写。

FSU PThreads 是实现 POSIX 线程 (基于 SunOS 4.1.x, Solaris 2.x, SCO UNIX, FreeBSD, Linux, 和 DOS)的 C 类库。

请访问 LINUX POSIX 和 DCE 线程主页。

请参阅 LinuxThreads 资料库

Proolix 是一种简单的遵从 POSIX 标准的基于 i8086+ 的操作系统。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息