您的位置:首页 > 运维架构 > Linux

linux内核部件分析(十一)——waitqueue与线程的阻塞

2011-10-31 02:15 483 查看
当你必须一个复杂的系统,协调系统的方方面面,灵活地支持各种机制和策略,即使很简单的问题也会变得很复杂。linux绝对就是这样一个复杂的系统。所以我们要理解它,尽量从原理的角度去理解事务的处理流程,尽量避免各种细枝末节的干扰,尽量规避那些足以压垮自己的庞然大物。(尽管细致末节和庞然大物很可能就是linux闪光的地方,但我们还是小心为上。)

原理

现在我们来考虑linux中线程的阻塞。它的原理很简单。我们有一个要阻塞的线程A和要唤醒它的线程B(当然也可以是中断处理例程ISR),有一个两者共知的等待队列Q(也许这个等待队列属于一个信号量什么的)。首先是线程A阻塞,要加入等待队列Q,需要先申请一个队列节点N,节点N中包含指向线程A的线程控制块(TCB)的指针,然后A就可以将自己的线程状态设为阻塞,并调用schedule()将自己踢出CPU的就绪队列。过了一定时间,线程B想要唤醒等待队列Q中的线程,它只需要获得线程A的TCB指针,将线程A状态设为就绪即可。等线程A恢复运行,将节点N退出等待队列Q,完成整个从阻塞到恢复的流程。

原理讲起来总是没意思的,下面我们还是看代码吧。我们规避复杂的任务状态转换和调度的内容,即使对等待队列的分析,也是按照从基础到扩展的顺序。代码出现在三个地方:include/linux/wait.h , kernel/wait.c, kernel/sched.c。不用说wait.h是头文件,wait.c是实现的地方,而sched.c则体现了waitqueue的一种应用(实现completion)。为了更好地分析completion,我们还需要include/linux/completion.h。

waitqueue实现

我们仍然先看数据结构。

struct __wait_queue_head {
spinlock_t lock;
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;

typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);

int default_wake_function(wait_queue_t *wait, unsigned mode, int flags, void *key);

struct __wait_queue {
unsigned int flags;
#define WQ_FLAG_EXCLUSIVE	0x01
void *private;
wait_queue_func_t func;
struct list_head task_list;
};

typedef struct __wait_queue wait_queue_t;




其中,wait_queue_head_t 就是等待队列头,wait_queue_t 就是队列节点。

wait_queue_head_t 包括一个自旋锁lock,还有一个双向循环队列task_list,这在我们的预料之内。

wait_queue_t 则包括较多,我们先来剧透一下。

flags变量只可能是0或者WQ_FLAG_EXCLUSIVE。flags标志只影响等待队列唤醒线程时的操作,置为WQ_FLAG_EXCLUSIVE则每次只允许唤醒一个线程,为0则无限制。

private指针,其实就是指向TCB的指针。

func是一个函数指针,指向用于唤醒队列中线程的函数。虽然提供了默认的唤醒函数default_wake_function,但也允许灵活的设置队列的唤醒函数。

task_list是一个双向循环链表节点,用于链入等待队列的链表。

依照旧例,waitqueue在数据结构之后为我们提供了丰富的初始化函数。因为太多了,我们只好分段列出。

#define __WAITQUEUE_INITIALIZER(name, tsk) {				\
.private	= tsk,						\
.func		= default_wake_function,			\
.task_list	= { NULL, NULL } }

#define DECLARE_WAITQUEUE(name, tsk)					\
wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)

#define __WAIT_QUEUE_HEAD_INITIALIZER(name) {				\
.lock		= __SPIN_LOCK_UNLOCKED(name.lock),		\
.task_list	= { &(name).task_list, &(name).task_list } }

#define DECLARE_WAIT_QUEUE_HEAD(name) \
wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
这是用宏定义,在声明变量时进行的初始化。

void __init_waitqueue_head(wait_queue_head_t *q, struct lock_class_key *key)
{
spin_lock_init(&q->lock);
lockdep_set_class(&q->lock, key);
INIT_LIST_HEAD(&q->task_list);
}

#define init_waitqueue_head(q)				\
do {						\
static struct lock_class_key __key;	\
\
__init_waitqueue_head((q), &__key);	\
} while (0)

#ifdef CONFIG_LOCKDEP
# define __WAIT_QUEUE_HEAD_INIT_ONSTACK(name) \
({ init_waitqueue_head(&name); name; })
# define DECLARE_WAIT_QUEUE_HEAD_ONSTACK(name) \
wait_queue_head_t name = __WAIT_QUEUE_HEAD_INIT_ONSTACK(name)
#else
# define DECLARE_WAIT_QUEUE_HEAD_ONSTACK(name) DECLARE_WAIT_QUEUE_HEAD(name)
#endif
这一段代码其实不要也可以,但因为是简单的细节,所以我们也覆盖到了。

init_wait_queue_head()对等待队列头进行初始化。

另外定义了宏DECLARE_WAIT_QUEUE_HEAD_ONSTACK。根据配置是否使用CONFIG_LOCKDEP,决定其实现。

spinlock很复杂,配置了CONFIG_LOCKDEP就会定义一个局部静态变量__key对spinlock使用的正确性进行检查。检查的过程很复杂,但既然是检查,就是可以

砍掉的。因为使用了局部静态变量,所以只能检查定义在栈上的变量,所以是DECLARE_WAIT_QUEUE_HEAD_ONSTACK。很多使用spinlock的地方都可以看到

这种检查。

static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
q->flags = 0;
q->private = p;
q->func = default_wake_function;
}

static inline void init_waitqueue_func_entry(wait_queue_t *q,
wait_queue_func_t func)
{
q->flags = 0;
q->private = NULL;
q->func = func;
}

static inline int waitqueue_active(wait_queue_head_t *q)
{
return !list_empty(&q->task_list);
}
init_waitqueue_entry()和init_waitqueue_func_entry()是用于初始化waitqueue的函数。

waitqueue_active()查看队列中是否有等待线程。

static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
list_add(&new->task_list, &head->task_list);
}

/*
* Used for wake-one threads:
*/
static inline void __add_wait_queue_tail(wait_queue_head_t *head,
wait_queue_t *new)
{
list_add_tail(&new->task_list, &head->task_list);
}

static inline void __remove_wait_queue(wait_queue_head_t *head,
wait_queue_t *old)
{
list_del(&old->task_list);
}
__add_wait_queue()将节点加入等待队列头部。

__add_wait_queue_tail()将节点加入等待队列尾部。

__remove_wait_queue()将节点从等待队列中删除。

这三个都是简单地用链表操作实现。

void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;

wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}

void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;

wait->flags |= WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue_tail(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}

void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;

spin_lock_irqsave(&q->lock, flags);
__remove_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}


add_wait_queue()将节点加入等待队列头部。

add_wait_queue_exclusive()将节点加入等待队列尾部。

remove_wait_queue()将节点从等待队列中删除。
这里三个函数和前面三个函数最大的区别就是这里加了禁止中断的自旋锁。从此也可以看出linux代码的一个特色。以双下划线前缀的函数往往是供内部调用的,即使外界使用也要清楚此函数的功能,比如前面的__add_wait_queue()等三个函数,就只能在以加带关中断的自旋锁时才能调用,目的是省去重复的加锁。而add_wait_queue()等函数则更为稳重一些。

或许你觉得不可思议,但waitqueue就是这么简单。下面我们来看看是怎样用它来实现completion的。

waitqueue的使用——实现completion

completion是一种建立在waitqueue之上的信号量机制,它的接口简单,功能更简单,是waitqueue之上最好的封装例子。

struct completion {
unsigned int done;
wait_queue_head_t wait;
};
completion的结构很简单,用done来进行计数,用wait保存等待队列。

#define COMPLETION_INITIALIZER(work) \
{ 0, __WAIT_QUEUE_HEAD_INITIALIZER((work).wait) }

#define COMPLETION_INITIALIZER_ONSTACK(work) \
({ init_completion(&work); work; })

#define DECLARE_COMPLETION(work) \
struct completion work = COMPLETION_INITIALIZER(work)

#ifdef CONFIG_LOCKDEP
# define DECLARE_COMPLETION_ONSTACK(work) \
struct completion work = COMPLETION_INITIALIZER_ONSTACK(work)
#else
# define DECLARE_COMPLETION_ONSTACK(work) DECLARE_COMPLETION(work)
#endif

static inline void init_completion(struct completion *x)
{
x->done = 0;
init_waitqueue_head(&x->wait);
}
/* reinitialize completion */
#define INIT_COMPLETION(x) ((x).done = 0)

以上是completion结构的初始宏定义和初始化函数。我们又在其中看到了CONFIG_LOCKDEP,已经熟悉了。

/**
* wait_for_completion: - waits for completion of a task
* @x:  holds the state of this particular completion
*
* This waits to be signaled for completion of a specific task. It is NOT
* interruptible and there is no timeout.
*
* See also similar routines (i.e. wait_for_completion_timeout()) with timeout
* and interrupt capability. Also see complete().
*/
void __sched wait_for_completion(struct completion *x)
{
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}

static long __sched
wait_for_common(struct completion *x, long timeout, int state)
{
might_sleep();

spin_lock_irq(&x->wait.lock);
timeout = do_wait_for_common(x, timeout, state);
spin_unlock_irq(&x->wait.lock);
return timeout;
}

static inline long __sched
do_wait_for_common(struct completion *x, long timeout, int state)
{
if (!x->done) {
DECLARE_WAITQUEUE(wait, current);

wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue_tail(&x->wait, &wait);
do {
if (signal_pending_state(state, current)) {
timeout = -ERESTARTSYS;
break;
}
__set_current_state(state);
spin_unlock_irq(&x->wait.lock);
timeout = schedule_timeout(timeout);
spin_lock_irq(&x->wait.lock);
} while (!x->done && timeout);
__remove_wait_queue(&x->wait, &wait);
if (!x->done)
return timeout;
}
x->done--;
return timeout ?: 1;
}



wait_for_completion()将线程阻塞在completion上。关键过程在计数值为0时调用do_wait_for_common阻塞。
do_wait_for_common()首先用DECLARE_WAITQUEUE()定义一个初始化好的wait_queue_t,并调用__add_wait_queuetail()将节点加入等待队列尾部。然后调用signal_pending_state()检查线程信号与等待状态的情况,如果允许信号响应并且有信号阻塞在线程上,自然不必再阻塞了,直接返回-ERESTARTSYS。否则调用__set_current_state()设置线程状态(线程阻塞的状态分TASK_INTERRUPTIBLE和TASK_UNINTERRUPTIBLE,前者允许信号中断,后者则不允许),并调用schedule_timeout()将当前线程从就绪队列换出。注意completion会在被唤醒时检查计数是否可被占用,有时唤醒了却无法占用时只得再次阻塞。最后获得计数后调用__remove_wait_queue()将局部变量节点从等待队列中删除。

do_wait_for_common()最后一行的c语句不符合标准,这也是gcc扩展的一部分。timeout为0时返回1,否则返回timeout值。
schedule_timeout()的功能是使当前线程至少睡眠timeout个jiffies时间片,timeout值为MAX_SCHEDULE_TIMEOUT时无限睡眠。返回值为0,如因响应信号而提前恢复,则返回剩余的timeout计数。

unsigned long __sched
wait_for_completion_timeout(struct completion *x, unsigned long timeout)
{
return wait_for_common(x, timeout, TASK_UNINTERRUPTIBLE);
}

int __sched wait_for_completion_interruptible(struct completion *x)
{
long t = wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_INTERRUPTIBLE);
if (t == -ERESTARTSYS)
return t;
return 0;
}

unsigned long __sched
wait_for_completion_interruptible_timeout(struct completion *x,
unsigned long timeout)
{
return wait_for_common(x, timeout, TASK_INTERRUPTIBLE);
}

int __sched wait_for_completion_killable(struct completion *x)
{
long t = wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_KILLABLE);
if (t == -ERESTARTSYS)
return t;
return 0;
}
wait_for_completion_timeout()使用带超时时间的阻塞。
wait_for_completion_interruptible()使用允许信号打断的阻塞。
wait_for_completion_interruptible_timeout()使用带超时时间的允许信号打断的阻塞。
wait_for_completion_killable()使用允许被杀死的阻塞。
四者都是wait_for_completion()的变种,通过wait_for_common()调用do_wait_for_common()实现。

void complete(struct completion *x)
{
unsigned long flags;

spin_lock_irqsave(&x->wait.lock, flags);
x->done++;
__wake_up_common(&x->wait, TASK_NORMAL, 1, 0, NULL);
spin_unlock_irqrestore(&x->wait.lock, flags);
}

/*
* The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
* wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
* number) then we wake all the non-exclusive tasks and one exclusive task.
*
* There are circumstances in which we can try to wake a task which has already
* started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
* zero in this (rare) case, and we handle it by continuing to scan the queue.
*/
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;

list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;

if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
complete()唤醒阻塞的线程。通过调用__wake_up_common()实现。

这里curr->func()调用的一般是default_wake_function()。

int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
void *key)
{
return try_to_wake_up(curr->private, mode, wake_flags);
}
default_wake_function()将正睡眠的线程唤醒,调用try_to_wake_up()实现。try_to_wake_up()内容涉及TCB状态等问题,我们将其忽略。

void complete_all(struct completion *x)
{
unsigned long flags;

spin_lock_irqsave(&x->wait.lock, flags);
x->done += UINT_MAX/2;
__wake_up_common(&x->wait, TASK_NORMAL, 0, 0, NULL);
spin_unlock_irqrestore(&x->wait.lock, flags);
}
complete_all()唤醒等待在completion上的所有线程。

bool try_wait_for_completion(struct completion *x)
{
int ret = 1;

spin_lock_irq(&x->wait.lock);
if (!x->done)
ret = 0;
else
x->done--;
spin_unlock_irq(&x->wait.lock);
return ret;
}
try_wait_for_completion()试图在不阻塞情况下获得信号量计数。

/**
*	completion_done - Test to see if a completion has any waiters
*	@x:	completion structure
*
*	Returns: 0 if there are waiters (wait_for_completion() in progress)
*		 1 if there are no waiters.
*
*/
bool completion_done(struct completion *x)
{
int ret = 1;

spin_lock_irq(&x->wait.lock);
if (!x->done)
ret = 0;
spin_unlock_irq(&x->wait.lock);
return ret;
}
completion_done()检查是否有线程阻塞。但这里实现过于简单,因为在返回0时也可能没有线程阻塞,也许只用在特殊情况下或者较为宽松的场合。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: