您的位置:首页 > 其它

使用工作队列

2008-12-17 10:30 260 查看
 
      我们先来看一下默认的events任务队列,然后再看看创建新的工作者线程。

     1.创建推后的工作

      首先要做的是实际创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地创建该结构体:

在<workqueue.h>中

#define DECLARE_WORK(n, f)                  /

    struct work_struct n = __WORK_INITIALIZER(n, f)

#define __WORK_INITIALIZER(n, f) {              /

    .data = WORK_DATA_INIT(0),              /

        .entry  = { &(n).entry, &(n).entry },           /

    .func = (f),                        /

    }

也可以在运行时通过指针创建一个工作:

#define INIT_WORK(_work, _func)                 /

    do {                            /

        (_work)->data = (atomic_long_t) WORK_DATA_INIT(0);  /

        INIT_LIST_HEAD(&(_work)->entry);        /

        PREPARE_WORK((_work), (_func));         /

    } while (0)

/*

 * initialize a work item's function pointer

 */

#define PREPARE_WORK(_work, _func)              /

    do {                            /

        (_work)->func = (_func);            /

    } while (0)

2.工作队列处理函数

 

       工作队列对立函数的原型是:

         void work_handler(void *data)

        这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,运行响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管操作处理函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

3.对工作进行调度

 

    要把给定工作的处理函数提交给默认的events工作线程,只须调用

    schedule_work(&work);

/**

 * schedule_work - put work task in global workqueue

 * @work: job to be done

 *

 * This puts a job in the kernel-global workqueue.

 */

int fastcall schedule_work(struct work_struct *work)

{

    return queue_work(keventd_wq, work);

}

/**

 * queue_work - queue work on a workqueue

 * @wq: workqueue to use

 * @work: work to queue

 *

 * Returns 0 if @work was already on a queue, non-zero otherwise.

 *

 * We queue the work to the CPU it was submitted, but there is no

 * guarantee that it will be processed by that CPU.

 */

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

    int ret = 0, cpu = get_cpu();

    if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

        if (unlikely(is_single_threaded(wq)))

            cpu = singlethread_cpu;

        BUG_ON(!list_empty(&work->entry));

        __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

        ret = 1;

    }

    put_cpu();

    return ret;

}

   

/* Preempt must be disabled. */

static void __queue_work(struct cpu_workqueue_struct *cwq,

             struct work_struct *work)

{

    unsigned long flags;

    spin_lock_irqsave(&cwq->lock, flags);

    set_wq_data(work, cwq);

    list_add_tail(&work->entry, &cwq->worklist);

    cwq->insert_sequence++;

    wake_up(&cwq->more_work);

    spin_unlock_irqrestore(&cwq->lock, flags);

}

     work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。如不希望工作马上被执行,延迟一段时间之后再执行,可以调度它在指定的时间执行:

    schedule_delayed_work(&work,delay);

/**

 * schedule_delayed_work - put work task in global workqueue after delay

 * @dwork: job to be done

 * @delay: number of jiffies to wait or 0 for immediate execution

 *

 * After waiting for a given time this puts a job in the kernel-global

 * workqueue.

 */

int fastcall schedule_delayed_work(struct delayed_work *dwork,

                    unsigned long delay)

{

    timer_stats_timer_set_start_info(&dwork->timer);

    return queue_delayed_work(keventd_wq, dwork, delay);

}

/**

 * queue_delayed_work - queue work on a workqueue after delay

 * @wq: workqueue to use

 * @dwork: delayable work to queue

 * @delay: number of jiffies to wait before queueing

 *

 * Returns 0 if @work was already on a queue, non-zero otherwise.

 */

int fastcall queue_delayed_work(struct workqueue_struct *wq,

            struct delayed_work *dwork, unsigned long delay)

{

    int ret = 0;

    struct timer_list *timer = &dwork->timer;

    struct work_struct *work = &dwork->work;

    timer_stats_timer_set_start_info(timer);

    if (delay == 0)

        return queue_work(wq, work);

    if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

        BUG_ON(timer_pending(timer));

        BUG_ON(!list_empty(&work->entry));

        /* This stores wq for the moment, for the timer_fn */

        set_wq_data(work, wq);

        timer->expires = jiffies + delay;

        timer->data = (unsigned long)dwork;

        timer->function = delayed_work_timer_fn;

        add_timer(timer);

        ret = 1;

    }

    return ret;

}

struct delayed_work {

    struct work_struct work;

    struct timer_list timer;

};

 

4.刷新操作

 

    排入队列的工作会在工作者线程下一次被唤醒的时候执行。有时,在继续下一步工作之前,你必须保证一些操作已经执行完毕了。这一点对模块来说很重要,在卸载之前,它就有可能需要调用下面的函数;而在内核的其他部分,为了防止竞争条件的出现,也可能需要确保不在有待处理的工作。

   出于以上目的,内核准备了一个用于刷新指定工作队列的函数:

    void flush_scheduled_work(void);

void flush_scheduled_work(void)

{

    flush_workqueue(keventd_wq);

}

/**

 * flush_workqueue - ensure that any scheduled work has run to completion.

 * @wq: workqueue to flush

 *

 * Forces execution of the workqueue and blocks until its completion.

 * This is typically used in driver shutdown handlers.

 *

 * This function will sample each workqueue's current insert_sequence number and

 * will sleep until the head sequence is greater than or equal to that.  This

 * means that we sleep until all works which were queued on entry have been

 * handled, but we are not livelocked by new incoming ones.

 *

 * This function used to run the workqueues itself.  Now we just wait for the

 * helper threads to do it.

 */

void fastcall flush_workqueue(struct workqueue_struct *wq)

{

    might_sleep();

    if (is_single_threaded(wq)) {

        /* Always use first cpu's area. */

        flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));

    } else {

        int cpu;

        mutex_lock(&workqueue_mutex);

        for_each_online_cpu(cpu)

            flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));

        mutex_unlock(&workqueue_mutex);

    }

}

     函数会一直等待,直到队列中所有对象都被执行以后才返回。在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以只能在进程上下文中使用它。

     注意,该函数并不取消任何延迟执行的工作。就是说,任何通过schedule_delayed_work()调度的工作,如果其延迟时间未结束,它并不会因为调用flush_scheduled_work()而被刷新掉。

      取消延迟执行的工作应该调用:

 int cancle_delayed_work(sruct work_struc work);

       这个函数可以取消任何与work_struct相关的挂起工作。

在<workqueue.h>中

/*

 * Kill off a pending schedule_delayed_work().  Note that the work callback

 * function may still be running on return from cancel_delayed_work().  Run

 * flush_scheduled_work() to wait on it.

 */

static inline int cancel_delayed_work(struct delayed_work *work)

{

    int ret;

    ret = del_timer_sync(&work->timer);

    if (ret)

        work_release(&work->work);

    return ret;

}

/**

 * work_release - Release a work item under execution

 * @work: The work item to release

 *

 * This is used to release a work item that has been initialised with automatic

 * release mode disabled (WORK_STRUCT_NOAUTOREL is set).  This gives the work

 * function the opportunity to grab auxiliary data from the container of the

 * work_struct before clearing the pending bit as the work_struct may be

 * subject to deallocation the moment the pending bit is cleared.

 *

 * In such a case, this should be called in the work function after it has

 * fetched any data it may require from the containter of the work_struct.

 * After this function has been called, the work_struct may be scheduled for

 * further execution or it may be deallocated unless other precautions are

 * taken.

 *

 * This should also be used to release a delayed work item.

 */

#define work_release(work) /

    clear_bit(WORK_STRUCT_PENDING, work_data_bits(work))

5.创建新的工作队列

 

         如果默认的队列不能满足需要,可以创建一个新的工作对列和与之相应的工作者线程。

         创建一个新的任务队列和与之相关的工作者线程,只须调用一个简单的函数:

         struct workqueue_struct *create_workqueue(const char *name);

在<workqueue.h>中

#define create_workqueue(name) __create_workqueue((name), 0, 0)

在<workqueue.c>中

struct workqueue_struct *__create_workqueue(const char *name,

                        int singlethread, int freezeable)

{

    int cpu, destroy = 0;

    struct workqueue_struct *wq;

    struct task_struct *p;

    wq = kzalloc(sizeof(*wq), GFP_KERNEL);

    if (!wq)

        return NULL;

    wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

    if (!wq->cpu_wq) {

        kfree(wq);

        return NULL;

    }

    wq->name = name;

    mutex_lock(&workqueue_mutex);

    if (singlethread) {

        INIT_LIST_HEAD(&wq->list);

        p = create_workqueue_thread(wq, singlethread_cpu, freezeable);

        if (!p)

            destroy = 1;

        else

            wake_up_process(p);

    } else {

        list_add(&wq->list, &workqueues);

        for_each_online_cpu(cpu) {

            p = create_workqueue_thread(wq, cpu, freezeable);

            if (p) {

                kthread_bind(p, cpu);

                wake_up_process(p);

            } else

                destroy = 1;

        }

    }

    mutex_unlock(&workqueue_mutex);

    /*

     * Was there any error during startup? If yes then clean up:

     */

    if (destroy) {

        destroy_workqueue(wq);

        wq = NULL;

    }

    return wq;

}

     这个函数会创建所有的工作者线程(系统中的每个处理器都有一个),并且做好所有开始处理工作之前的准备工作。

     创建一个工作的时候无须考虑工作队列的类型。可以使用下列函数对给定工作而不是默认的event队列进行操作。

int queue_work(struct workqueue_struct *wq, struct work_struct *work);

int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay);

flush_workqueue(struct workqueue_struct *wq);

/**

 * queue_work - queue work on a workqueue

 * @wq: workqueue to use

 * @work: work to queue

 *

 * Returns 0 if @work was already on a queue, non-zero otherwise.

 *

 * We queue the work to the CPU it was submitted, but there is no

 * guarantee that it will be processed by that CPU.

 */

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

    int ret = 0, cpu = get_cpu();

    if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

        if (unlikely(is_single_threaded(wq)))

            cpu = singlethread_cpu;

        BUG_ON(!list_empty(&work->entry));

        __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

        ret = 1;

    }

    put_cpu();

    return ret;

}

/**

 * queue_delayed_work - queue work on a workqueue after delay

 * @wq: workqueue to use

 * @dwork: delayable work to queue

 * @delay: number of jiffies to wait before queueing

 *

 * Returns 0 if @work was already on a queue, non-zero otherwise.

 */

int fastcall queue_delayed_work(struct workqueue_struct *wq,

            struct delayed_work *dwork, unsigned long delay)

{

    int ret = 0;

    struct timer_list *timer = &dwork->timer;

    struct work_struct *work = &dwork->work;

    timer_stats_timer_set_start_info(timer);

    if (delay == 0)

        return queue_work(wq, work);

    if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

        BUG_ON(timer_pending(timer));

        BUG_ON(!list_empty(&work->entry));

        /* This stores wq for the moment, for the timer_fn */

        set_wq_data(work, wq);

        timer->expires = jiffies + delay;

        timer->data = (unsigned long)dwork;

        timer->function = delayed_work_timer_fn;

        add_timer(timer);

        ret = 1;

    }

    return ret;

}

/**

 * flush_workqueue - ensure that any scheduled work has run to completion.

 * @wq: workqueue to flush

 *

 * Forces execution of the workqueue and blocks until its completion.

 * This is typically used in driver shutdown handlers.

 *

 * This function will sample each workqueue's current insert_sequence number and

 * will sleep until the head sequence is greater than or equal to that.  This

 * means that we sleep until all works which were queued on entry have been

 * handled, but we are not livelocked by new incoming ones.

 *

 * This function used to run the workqueues itself.  Now we just wait for the

 * helper threads to do it.

 */

void fastcall flush_workqueue(struct workqueue_struct *wq)

{

    might_sleep();

    if (is_single_threaded(wq)) {

        /* Always use first cpu's area. */

        flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));

    } else {

        int cpu;

        mutex_lock(&workqueue_mutex);

        for_each_online_cpu(cpu)

            flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));

        mutex_unlock(&workqueue_mutex);

    }

}

static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)

{

    if (cwq->thread == current) {

        /*

         * Probably keventd trying to flush its own queue. So simply run

         * it by hand rather than deadlocking.

         */

        run_workqueue(cwq);

    } else {

        DEFINE_WAIT(wait);

        long sequence_needed;

        spin_lock_irq(&cwq->lock);

        sequence_needed = cwq->insert_sequence;

        while (sequence_needed - cwq->remove_sequence > 0) {

            prepare_to_wait(&cwq->work_done, &wait,

                    TASK_UNINTERRUPTIBLE);

            spin_unlock_irq(&cwq->lock);

            schedule();

            spin_lock_irq(&cwq->lock);

        }

        finish_wait(&cwq->work_done, &wait);

        spin_unlock_irq(&cwq->lock);

    }

}

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

    unsigned long flags;

    /*

     * Keep taking off work from the queue until

     * done.

     */

    spin_lock_irqsave(&cwq->lock, flags);

    cwq->run_depth++;

    if (cwq->run_depth > 3) {

        /* morton gets to eat his hat */

        printk("%s: recursion depth exceeded: %d/n",

            __FUNCTION__, cwq->run_depth);

        dump_stack();

    }

    while (!list_empty(&cwq->worklist)) {

        struct work_struct *work = list_entry(cwq->worklist.next,

                        struct work_struct, entry);

        work_func_t f = work->func;

        list_del_init(cwq->worklist.next);

        spin_unlock_irqrestore(&cwq->lock, flags);

        BUG_ON(get_wq_data(work) != cwq);

        if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))

            work_release(work);

        f(work);

        if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {

            printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "

                    "%s/0x%08x/%d/n",

                    current->comm, preempt_count(),

                        current->pid);

            printk(KERN_ERR "    last function: ");

            print_symbol("%s/n", (unsigned long)f);

            debug_show_held_locks(current);

            dump_stack();

        }

        spin_lock_irqsave(&cwq->lock, flags);

        cwq->remove_sequence++;

        wake_up(&cwq->work_done);

    }

    cwq->run_depth--;

    spin_unlock_irqrestore(&cwq->lock, flags);

}

呵呵,又把代码贴完了:)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息