linux工作队列 - workqueue_struct创建
2016-10-16 19:27
453 查看
文章系列
1.linux工作队列 - workqueue总览2.linux工作队列 - workqueue_struct创建
3.linux工作队列 - 把work_struct加入工作队列
4.linux工作队列 - work_struct被调用过程
1.创建workqueue代码分析
1.1整体代码分析
根据FLAG的不同,创建workqueue的API分好几种(见系列文章1说明),根据情况使用,但最终这些API都会调用到alloc_workqueue,这是一个宏定义,它的调用序列图如下所示:这里重点介绍函数alloc_and_link_pwqs(),wq在此函数中创建:
static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; int cpu, ret; if (!(wq->flags & WQ_UNBOUND)) {------------------------------创建bound类型的wq wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); if (!wq->cpu_pwqs) |------得到per-CPU pool_workqueue return -ENOMEM; for_each_possible_cpu(cpu) {---------------对于每个CPU,分配pool_workqueue和worker_pool struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);--分配对应cpu的pool_workqueue struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu);--分配对应cpu的worker_pool,worker_pool的初始化在系统启动时创建,并加入到全局变量cpu_worker_pools中,具体系统启动是wq的初始化分析见第二节 init_pwq(pwq, wq, &cpu_pools[highpri]); mutex_lock(&wq->mutex); link_pwq(pwq);----------------------把对应的pool_workqueue加入到wq中,即链表字段wq->pwqs中 mutex_unlock(&wq->mutex); } return 0; } else if (wq->flags & __WQ_ORDERED) {-----------------------创建orderd类型的wq,only single pwq ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s\n", wq->name); return ret; } else {-----------------------------------------------------创建unbound类型的wq return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } }
从上面分析可以清晰了解到bound类型的wq的创建过程,下面分析unbound类型的wq创建过程
1.2unbound类型的wq创建
orderd类型的和unbound类型调用接口都是apply_workqueue_attrs,不同的是attrs不一样,ordered_wq_attrs和unbound_std_wq_attrs在wq初始化时创建,具体见第二节介绍,apply_workqueue_attrs简易序列图如下:最终调用到函数alloc_unbound_pwq,主要是要得到pool_workqueue,代码分析如下:
static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) { struct worker_pool *pool; struct pool_workqueue *pwq; lockdep_assert_held(&wq_pool_mutex); pool = get_unbound_pool(attrs);--------pool_workqueue必须指向相应的worker_pool,得到该种属性的worker_pool if (!pool) return NULL; pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);--给pool_workqueue 分配内存 if (!pwq) { put_unbound_pool(pool); return NULL; } init_pwq(pwq, wq, pool); return pwq;---------------------------返回pool_workqueue
上面的代码很简单最主要的是要进入函数get_unbound_pool中,主要是要得到worker_pool ,分析如下:
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) { u32 hash = wqattrs_hash(attrs);-------根据attrs计算对应的散列值 struct worker_pool *pool; int node; int target_node = NUMA_NO_NODE; lockdep_assert_held(&wq_pool_mutex); /* do we already have a matching pool? */ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) { if (wqattrs_equal(pool->attrs, attrs)) { pool->refcnt++; return pool;------------------从全局散列表unbound_pool_hash中,根据attrs对比找到存在的worker_pool,找到就返回 } } //走到这一步说明没有找到存在的worker_pool,那么下面就得新创建一个 /* if cpumask is contained inside a NUMA node, we belong to that node */ if (wq_numa_enabled) { for_each_node(node) { if (cpumask_subset(attrs->cpumask, wq_numa_possible_cpumask[node])) { target_node = node; break; } } } /* nope, create a new one */ pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);--创建一个worker_pool if (!pool || init_worker_pool(pool) < 0) goto fail; lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */ copy_workqueue_attrs(pool->attrs, attrs);---------------------copy属性给worker_pool pool->node = target_node; /* * no_numa isn't a worker_pool attribute, always clear it. See * 'struct workqueue_attrs' comments for detail. */ pool->attrs->no_numa = false; if (worker_pool_assign_id(pool) < 0) goto fail; /* create and start the initial worker */ if (!create_worker(pool))-------------------------------------创建工作者线程,详细分析见第三节 goto fail; /* install */ hash_add(unbound_pool_hash, &pool->hash_node, hash);----------加入到全局unbound_pool_hash散列表中 return pool; fail: if (pool) put_unbound_pool(pool); return NULL; }
2.系统启动时wq的初始化
系统初始化的时候会调用early_initcall(init_workqueues),下面来分析一下init_workqueues:static int __init init_workqueues(void) { int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; int i, cpu; WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL)); cpumask_copy(wq_unbound_cpumask, cpu_possible_mask); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);-----初始化pwq_cache,具体内存这块以后再补 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); wq_numa_init(); //对每个cpu初始化worker_pool /* initialize CPU pools */ for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; for_each_cpu_worker_pool(pool, cpu) { BUG_ON(init_worker_pool(pool)); pool->cpu = cpu; cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); pool->attrs->nice = std_nice[i++]; pool->node = cpu_to_node(cpu); /* alloc pool ID */ mutex_lock(&wq_pool_mutex); BUG_ON(worker_pool_assign_id(pool)); mutex_unlock(&wq_pool_mutex); } } //对每个cpu创建工作者线程池 /* create the initial worker */ for_each_online_cpu(cpu) { struct worker_pool *pool; for_each_cpu_worker_pool(pool, cpu) { pool->flags &= ~POOL_DISASSOCIATED; BUG_ON(!create_worker(pool)); } } //创建默认的unbound and ordered wq attrs /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { struct workqueue_attrs *attrs; BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; unbound_std_wq_attrs[i] = attrs; /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs; } //创建workqueue_struct,一般我们不会另外再创建新的workqueue_struct,开发者使用这些就可以了 system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0); system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0); BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq); wq_watchdog_init(); return 0; }
3.创建工作者worker线程池
下面分析函数create_worker:static struct worker *create_worker(struct worker_pool *pool) { struct worker *worker = NULL; int id = -1; char id_buf[16]; /* ID is needed to determine kthread name */ id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL); if (id < 0) goto fail; worker = alloc_worker(pool->node);-----------分配一个工作者worker if (!worker) goto fail; worker->pool = pool; worker->id = id; if (pool->cpu >= 0) snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id, pool->attrs->nice < 0 ? "H" : ""); else snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id); worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);---为工作者创建线程 if (IS_ERR(worker->task)) goto fail; set_user_nice(worker->task, pool->attrs->nice); kthread_bind_mask(worker->task, pool->attrs->cpumask); /* successful, attach the worker to the pool */ worker_attach_to_pool(worker, pool);------------把工作者加入到工作者线程池worker_pool /* start the newly created worker */ spin_lock_irq(&pool->lock); worker->pool->nr_workers++; worker_enter_idle(worker); wake_up_process(worker->task);-----------------唤醒工作者 spin_unlock_irq(&pool->lock); return worker;---------------------------------返回worker_pool fail: if (id >= 0) ida_simple_remove(&pool->worker_ida, id); kfree(worker); return NULL;
相关文章推荐
- linux2.6中的工作队列接口 workqueue_struct
- linux2.6中的工作队列接口 workqueue_struct
- linux2.6中的工作队列接口 workqueue_struct
- linux工作队列 - work_struct被调用过程
- linux工作队列 - workqueue
- linux 工作队列workqueue
- linux工作队列 - workqueue总览——代码分析
- linux INIT_WORK 创建工作队列
- linux 工作队列(workqueue)
- linux工作队列 - 把work_struct加入工作队列
- linux 工作队列(workqueue)——非常详细易懂
- [Linux API]linux 工作队列workqueue
- Linux 工作队列之工作者线程创建
- linux INIT_WORK 创建工作队列
- Linux工作队列workqueue实现分析
- Linux工作队列workqueue实现分析
- Linux中断管理 (3)workqueue工作队列
- Linux工作队列联想出来的
- linux2.6.36之后对工作队列的改进
- Linux 内核中工作队列的操作