您的位置:首页 > 运维架构 > Linux

Linux 工作队列之工作者线程创建

2017-03-08 20:49 375 查看
前面介绍了工作队列相关数据结构定义,那么对于工作队列上的任务什么时候执行呢?以及相关

数据结构是什么呢?

工作队列中,执行任务的叫worker,这些worker构成worker_pool。所以有下面定义:

/*
 * The poor guys doing the actual heavy lifting.  All on-duty workers are
 * either serving the manager role, on idle list or on busy hash.  For
 * details on the locking annotation (L, I, X...), refer to workqueue.c.
 *
 * Only to be used in workqueue and async.
 */
struct worker {
 /* on idle list while idle, on busy hash table while busy */
 union {
  struct list_head entry; /* L: while idle */
  struct hlist_node hentry; /* L: while busy */
 };
 struct work_struct *current_work; /* L: work being processed */
 work_func_t  current_func; /* L: current_work's fn */
 struct pool_workqueue *current_pwq; /* L: current_work's pwq */
 bool   desc_valid; /* ->desc is valid */
 struct list_head scheduled; /* L: scheduled works */
 /* 64 bytes boundary on 64bit, 32 on 32bit */
 struct task_struct *task;  /* I: worker task */
 struct worker_pool *pool;  /* I: the associated pool */
      /* L: for rescuers */
 struct list_head node;  /* A: anchored at pool->workers */
      /* A: runs through worker->node */
 unsigned long  last_active; /* L: last active timestamp */
 unsigned int  flags;  /* X: flags */
 int   id;  /* I: worker id */
 /*
  * Opaque string set with work_set_desc().  Printed out with task
  * dump for debugging - WARN, BUG, panic or sysrq.
  */
 char   desc[WORKER_DESC_LEN];
 /* used only by rescuers to point to the target workqueue */
 struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */
};

/*
 * Structure fields follow one of the following exclusion rules.
 *
 * I: Modifiable by initialization/destruction paths and read-only for
 *    everyone else.
 *
 * P: Preemption protected.  Disabling preemption is enough and should
 *    only be modified and accessed from the local cpu.
 *
 * L: pool->lock protected.  Access with pool->lock held.
 *
 * X: During normal operation, modification requires pool->lock and should
 *    be done only from local cpu.  Either disabling preemption on local
 *    cpu or grabbing pool->lock is enough for read access.  If
 *    POOL_DISASSOCIATED is set, it's identical to L.
 *
 * A: pool->attach_mutex protected.
 *
 * PL: wq_pool_mutex protected.
 *
 * PR: wq_pool_mutex protected for writes.  Sched-RCU protected for reads.
 *
 * PW: wq_pool_mutex and wq->mutex protected for writes.  Either for reads.
 *
 * PWR: wq_pool_mutex and wq->mutex protected for writes.  Either or
 *      sched-RCU for reads.
 *
 * WQ: wq->mutex protected.
 *
 * WR: wq->mutex protected for writes.  Sched-RCU protected for reads.
 *
 * MD: wq_mayday_lock protected.
 */
/* struct worker is defined in workqueue_internal.h */
struct worker_pool {
 spinlock_t  lock;  /* the pool lock */
 int   cpu;  /* I: the associated cpu */
 int   node;  /* I: the associated node ID */
 int   id;  /* I: pool ID */
 unsigned int  flags;  /* X: flags */
 unsigned long  watchdog_ts; /* L: watchdog timestamp */
 struct list_head worklist; /* L: list of pending works */
 int   nr_workers; /* L: total number of workers */
 /* nr_idle includes the ones off idle_list for rebinding */
 int   nr_idle; /* L: currently idle ones */
 struct list_head idle_list; /* X: list of idle workers */
 struct timer_list idle_timer; /* L: worker idle timeout */
 struct timer_list mayday_timer; /* L: SOS timer for workers */
 /* a workers is either on busy_hash or idle_list, or the manager */
 DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
      /* L: hash of busy workers */
 /* see manage_workers() for details on the two manager mutexes */
 struct mutex  manager_arb; /* manager arbitration */
 struct worker  *manager; /* L: purely informational */
 struct mutex  attach_mutex; /* attach/detach exclusion */
 struct list_head workers; /* A: attached workers */
 struct completion *detach_completion; /* all workers detached */
 struct ida  worker_ida; /* worker IDs for task name */
 struct workqueue_attrs *attrs;  /* I: worker attributes */
 struct hlist_node hash_node; /* PL: unbound_pool_hash node */
 int   refcnt;  /* PL: refcnt for unbound pools */
 /*
  * The current concurrency level.  As it's likely to be accessed
  * from other CPUs during try_to_wake_up(), put it in a separate
  * cacheline.
  */
 atomic_t  nr_running ____cacheline_aligned_in_smp;
 /*
  * Destruction of pool is sched-RCU protected to allow dereferences
  * from get_work_pool().
  */
 struct rcu_head  rcu;
} ____cacheline_aligned_in_smp;

内核对worker创建由函数 struct worker *create_worker(struct worker_pool *pool)
处理,worker->task这个绑定了一个内核线程。

/**
 * create_worker - create a new workqueue worker
 * @pool: pool the new worker will belong to
 *
 * Create a new worker which is bound to @pool.  The returned worker
 * can be started by calling start_worker() or destroyed using
 * destroy_worker().
 *
 * CONTEXT:
 * Might sleep.  Does GFP_KERNEL allocations.
 *
 * RETURNS:
 * Pointer to the newly created worker.
 */
static struct worker *create_worker(struct worker_pool *pool)
{
 struct worker *worker = NULL;
 int id = -1;
 char id_buf[16];
 lockdep_assert_held(&pool->manager_mutex);
 /*
  * ID is needed to determine kthread name.  Allocate ID first
  * without installing the pointer.
  */
 idr_preload(GFP_KERNEL);
 spin_lock_irq(&pool->lock);
 id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT);
 spin_unlock_irq(&pool->lock);
 idr_preload_end();
 if (id < 0)
  goto fail;
 worker = alloc_worker();
 if (!worker)
  goto fail;
 worker->pool = pool;
 worker->id = id;
 if (pool->cpu >= 0)
  snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    pool->attrs->nice < 0  ? "H" : "");
 else
  snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
 worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
           "kworker/%s", id_buf);

 if (IS_ERR(worker->task))
  goto fail;
 /*
  * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
  * online CPUs.  It'll be re-applied when any of the CPUs come up.
  */
 set_user_nice(worker->task, pool->attrs->nice);
 set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
 /* prevent userland from meddling with cpumask of workqueue workers */
 worker->task->flags |= PF_NO_SETAFFINITY;
 /*
  * The caller is responsible for ensuring %POOL_DISASSOCIATED
  * remains stable across this function.  See the comments above the
  * flag definition for details.
  */
 if (pool->flags & POOL_DISASSOCIATED)
  worker->flags |= WORKER_UNBOUND;
 /* successful, commit the pointer to idr */
 spin_lock_irq(&pool->lock);
 idr_replace(&pool->worker_idr, worker, worker->id);
 spin_unlock_irq(&pool->lock);
 return worker;
fail:
 if (id >= 0) {
  spin_lock_irq(&pool->lock);
  idr_remove(&pool->worker_idr, id);
  spin_unlock_irq(&pool->lock);
 }
 kfree(worker);
 return NULL;
}

任务唤醒函数:
static void wake_up_worker(struct worker_pool *pool)
{
 struct worker *worker = first_worker(pool);
 if (likely(worker))
  wake_up_process(worker->task);
}
 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: