您的位置:首页 > 其它

工作队列使用方法

2011-10-22 11:37 225 查看
一、用法

struct cpu_workqueue_struct {

spinlock_t lock;

long remove_sequence; /* Least-recently added (next to run) */

long insert_sequence; /* Next to add */

struct list_head worklist;

wait_queue_head_t more_work;

wait_queue_head_t work_done;

struct workqueue_struct *wq;

struct task_struct *thread;

int run_depth; /* Detect run_workqueue() recursion depth */

} ____cacheline_aligned;

/*

* The externally visible workqueue abstraction is an array of

* per-CPU workqueues:

*/

struct workqueue_struct {

struct cpu_workqueue_struct *cpu_wq;

const char *name;

struct list_head list; /* Empty if single thread */

};

工作队列的使用很简单。

1.首先是建立一个工作队列:

struct workqueue_struct *keventd_wq;

keventd_wq = create_workqueue("events");

2.然后就是在这个队列中insert你所要做的“工作”:

DECLARE_WORK(work, func, data)

queue_work(keventd_wq, work);

struct work_struct {

unsigned long pending;

struct list_head entry;

void (*func)(void *);

void *data;

void *wq_data;

struct timer_list timer;

};

初始化有两种方法。

一种为静态方法:

#define __WORK_INITIALIZER(n, f, d) { \

.entry = { &(n).entry, &(n).entry }, \

.func = (f), \

.data = (d), \

.timer = TIMER_INITIALIZER(NULL, 0, 0), \

}

#define DECLARE_WORK(n, f, d) \

struct work_struct n = __WORK_INITIALIZER(n, f, d)

另一种为动态方法:

/*

* initialize all of a work-struct:

*/

#define INIT_WORK(_work, _func, _data) \

do { \

INIT_LIST_HEAD(&(_work)->entry); \

(_work)->pending = 0; \

PREPARE_WORK((_work), (_func), (_data)); \

init_timer(&(_work)->timer); \

} while (0)

二、执行过程

create_workqueue() -> __create_workqueue()

struct workqueue_struct *__create_workqueue(const char *name,

int singlethread)

{

int cpu, destroy = 0;

struct workqueue_struct *wq;

struct task_struct *p;

wq = kzalloc(sizeof(*wq), GFP_KERNEL);

//为每个CPU建立一个结构

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

...

wq->name = name;

mutex_lock(&workqueue_mutex);

if (singlethread) {

...

} else {

list_add(&wq->list, &workqueues);

for_each_online_cpu(cpu) {

//为每个CPU创建一个线程

p = create_workqueue_thread(wq, cpu);

if (p) {

kthread_bind(p, cpu);

//唤醒这个线程执行工作

wake_up_process(p);

} else

destroy = 1;

}

}

mutex_unlock(&workqueue_mutex);

...

return wq;

}

create_workqueue() -> __create_workqueue() -> create_workqueue_thread()

static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,

int cpu)

{

struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);

struct task_struct *p;

spin_lock_init(&cwq->lock);

cwq->wq = wq;

cwq->thread = NULL;

cwq->insert_sequence = 0;

cwq->remove_sequence = 0;

INIT_LIST_HEAD(&cwq->worklist);

init_waitqueue_head(&cwq->more_work);

init_waitqueue_head(&cwq->work_done);

if (is_single_threaded(wq))

p = kthread_create(worker_thread, cwq, "%s", wq->name);

else

//创建一个线程,这个线程以cwq为数据执行worker_thread这个函数

p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);

if (IS_ERR(p))

return NULL;

cwq->thread = p; //

return p;

}

create_workqueue() -> __create_workqueue() -> create_workqueue_thread() -> worker_thread()

//本函数在一个死循环等待工作的到来,这一般在睡眠状态中,等待被唤醒执行工作

//当有工作到来时queue_work()会将这个线程唤醒

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq = __cwq;

DECLARE_WAITQUEUE(wait, current);

...

current->flags |= PF_NOFREEZE;

//设置优先级

set_user_nice(current, -5);

...

set_current_state(TASK_INTERRUPTIBLE);

while (!kthread_should_stop()) {

//将本线程加入睡眠队列,用于睡眠后可以被唤醒

add_wait_queue(&cwq->more_work, &wait);

//如果没用被执行的“工作”,则将自己切换出去,进入睡眠状态

if (list_empty(&cwq->worklist))

schedule();

else //否则或是被唤醒

__set_current_state(TASK_RUNNING);

remove_wait_queue(&cwq->more_work, &wait);

//工作队列非空,执行工作

if (!list_empty(&cwq->worklist))

run_workqueue(cwq);

set_current_state(TASK_INTERRUPTIBLE);

}

__set_current_state(TASK_RUNNING);

return 0;

}

create_workqueue() -> __create_workqueue() -> create_workqueue_thread()

-> worker_thread() -> run_workqueue()

//该函数执行真正的工作

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

unsigned long flags;

spin_lock_irqsave(&cwq->lock, flags);

...

//顺次执行队列中的所有工作

while (!list_empty(&cwq->worklist)) {

struct work_struct *work = list_entry(cwq->worklist.next,

struct work_struct, entry);

void (*f) (void *) = work->func;

void *data = work->data;

//从队列中删除待执行的任务

list_del_init(cwq->worklist.next);

spin_unlock_irqrestore(&cwq->lock, flags);

BUG_ON(work->wq_data != cwq);

clear_bit(0, &work->pending);

//执行“工作”

f(data);

spin_lock_irqsave(&cwq->lock, flags);

cwq->remove_sequence++;

wake_up(&cwq->work_done); //

}

cwq->run_depth--;

spin_unlock_irqrestore(&cwq->lock, flags);

}

三、工作线程创建的详细过程

create_workqueue() -> __create_workqueue() -> create_workqueue_thread()

-> kthread_create()

struct task_struct *kthread_create(int (*threadfn)(void *data),

void *data,

const char namefmt[],

...)

{

//初始化用于创建线程的辅助结构

struct kthread_create_info create;

DECLARE_WORK(work, keventd_create_kthread, &create);

create.threadfn = threadfn;

create.data = data;

init_completion(&create.started);

init_completion(&create.done);

if (!helper_wq) //首先创建辅助工作队列

work.func(work.data);

else {

//注意,“创建一个工作队列”这个工作本身又是属于helper_wq工作队列

//的一项工作,所以,将这个工作加入的辅助工作队列中等待执行。

queue_work(helper_wq, &work);

wait_for_completion(&create.done);

}

...

return create.result;

}

create_workqueue() -> __create_workqueue() -> create_workqueue_thread()

-> kthread_create()-> keventd_create_kthread()

//最终会调用kernel_thread为每个工作队列创建一个线程

//这样,被创建的线程会以create为数据执行kthread(如下),而kthread中则执行create中的threadfn(data),

//即为create_workqueue_thread中的worker_thread(cwq),即为我们工作队列要执行的函数了。

static void keventd_create_kthread(void *_create)

{

struct kthread_create_info *create = _create;

int pid;

/* We want our own signal handler (we take no signals by default). */

pid = kernel_thread(kthread, create, CLONE_FS | CLONE_FILES | SIGCHLD);

if (pid < 0) {

create->result = ERR_PTR(pid);

} else {

wait_for_completion(&create->started);

read_lock(&tasklist_lock);

create->result = find_task_by_pid(pid);

read_unlock(&tasklist_lock);

}

complete(&create->done);

}

static int kthread(void *_create)

{

struct kthread_create_info *create = _create;

int (*threadfn)(void *data);

void *data;

...

threadfn = create->threadfn;

data = create->data;

...

if (!kthread_should_stop())

ret = threadfn(data);

...

return 0;

}

四、插入“工作”

/* Preempt must be disabled. */

static void __queue_work(struct cpu_workqueue_struct *cwq,

struct work_struct *work)

{

unsigned long flags;

spin_lock_irqsave(&cwq->lock, flags);

work->wq_data = cwq;

//将当前工作插入到工作队列中待待执行

list_add_tail(&work->entry, &cwq->worklist);

cwq->insert_sequence++;

wake_up(&cwq->more_work); //唤醒相应线程

spin_unlock_irqrestore(&cwq->lock, flags);

}

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

int ret = 0, cpu = get_cpu();

//如里当前工作未在队列中才插入

if (!test_and_set_bit(0, &work->pending)) {

...

__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

ret = 1;

}

put_cpu();

return ret;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: