本文转载自:https://my.oschina.net/kaedehao/blog/631394
workqueue作为内核的重要基础组件,在内核中被广泛的使用,通过工作队列,可以很方便的把我们要执行的某个任务(即函数+上下文参数)交代给内核,由内核替我们执行。本文主要是介绍工作队列的使用,及其内部实现的逻辑。
因为内核在系统初始化的时候,已为我们创建好了默认的工作队列,所以我们在使用的时候,可以不需要再创建工作队列,只需要创建工作,并将工作添加到工作队列上即可。当然,我们也可以创建自己的工作队列,而不使用内核创建好的工作队列。
简单的理解,工作队列是由内核线程+链表+等待队列来实现的,即由一个内核线程不断的从链表中读取工作,然后执行工作的工作函数!
一、工作的表示: struct work_struct
1 typedef void (*work_func_t)(struct work_struct *work); 2 struct work_struct { 3 atomic_long_t data; /* 内核内部使用 */ 4 struct list_head entry; /* 用于链接到工作队列中*/ 5 work_func_t func; /* 工作函数*/ 6 #ifdef CONFIG_LOCKDEP 7 struct lockdep_map lockdep_map; 8 #endif 9 };
从这个结构体可以看出,一个工作,就是一个待执行的工作函数,那如果我们要给工作函数传递参数,怎么解决呢?
仔细观察工作函数的格式:参数是work_struct,所以在实际使用的时候,经常会在创建我们自己工作的时候,将此结构体嵌套在内部。然后在work_func函数内部通过container_of来得到我们自定义的工作,这样子就完成参数的传递了。
二、工作队列的常用接口: 在linux/kernel/workqueue.h
-
初始化工作:
1 #define __DELAYED_WORK_INITIALIZER(n, f) { 2 .work = __WORK_INITIALIZER((n).work, (f)), 3 .timer = TIMER_INITIALIZER(NULL, 0, 0), 4 } 5 6 #define DECLARE_WORK(n, f) 7 struct work_struct n = __WORK_INITIALIZER(n, f) 8 9 // 也可以使用INIT_WORK宏: 10 #define INIT_WORK(_work, _func) 11 do { 12 (_work)->data = (atomic_long_t) WORK_DATA_INIT(); 13 INIT_LIST_HEAD(&(_work)->entry); 14 PREPARE_WORK((_work), (_func)); 15 } while (0)
主要是完成func成员的赋值。
2. 工作入队: 添加到内核工作队列
1 int schedule_work(struct work_struct *work);
3. 工作撤销: 从内核工作队列中删除
1 int cancel_work_sync(struct work_struct *work);
4. 创建工作队列:
1 #ifdef CONFIG_LOCKDEP 2 #define __create_workqueue(name, singlethread, freezeable, rt) 3 ({ 4 static struct lock_class_key __key; 5 const char *__lock_name; 6 7 if (__builtin_constant_p(name)) 8 __lock_name = (name); 9 else 10 __lock_name = #name; 11 12 __create_workqueue_key((name), (singlethread), 13 (freezeable), (rt), &__key, 14 __lock_name); 15 }) 16 #else 17 #define __create_workqueue(name, singlethread, freezeable, rt) 18 __create_workqueue_key((name), (singlethread), (freezeable), (rt), 19 NULL, NULL) 20 #endif 21 22 #define create_workqueue(name) __create_workqueue((name), 0, 0, 0) 23 #define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1) 24 #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0) 25 #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0) 26 创建工作队列,最终调用的都是__create_workqueue_key()函数来完成,此函数返回的是struct workqueue_struct *,用于表示一个工作队列。 27 28 5. 销毁工作队列: 29 30 void destroy_workqueue(struct workqueue_struct *wq); 31 32 33 在介绍了上面的接口后,看一个简单的使用例子,这个例子使用的是内核已创建好的工作队列,要使用自己创建的工作队列,也是很简单的,看了后面的实现源码分析就清楚了。 34 35 #include <linux/module.h> 36 #include <linux/delay.h> 37 #include <linux/workqueue.h> 38 39 #define ENTER() printk(KERN_DEBUG "%s() Enter", __func__) 40 #define EXIT() printk(KERN_DEBUG "%s() Exit", __func__) 41 #define ERR(fmt, args...) printk(KERN_ERR "%s()-%d: " fmt " ", __func__, __LINE__, ##args) 42 #define DBG(fmt, args...) printk(KERN_DEBUG "%s()-%d: " fmt " ", __func__, __LINE__, ##args) 43 44 struct test_work { 45 struct work_struct w; 46 unsigned long data; 47 }; 48 49 static struct test_work my_work; 50 51 static void my_work_func(struct work_struct *work) 52 { 53 struct test_work *p_work; 54 ENTER(); 55 p_work = container_of(work, struct test_work, w); 56 while (p_work->data) { 57 DBG("data: %lu", p_work->data--); 58 msleep_interruptible(1000); 59 } 60 61 EXIT(); 62 } 63 64 static int __init wq_demo_init(void) 65 { 66 INIT_WORK(&my_work.w, my_work_func); 67 my_work.data = 30; 68 69 msleep_interruptible(1000); 70 DBG("schedule work begin:"); 71 if (schedule_work(&my_work.w) == 0) { 72 ERR("schedule work fail"); 73 return -1; 74 } 75 76 DBG("success"); 77 return 0; 78 } 79 80 static void __exit wq_demo_exit(void) 81 { 82 ENTER(); 83 while (my_work.data) { 84 DBG("waiting exit"); 85 msleep_interruptible(2000); 86 } 87 EXIT(); 88 } 89 90 MODULE_LICENSE("GPL"); 91 module_init(wq_demo_init); 92 module_exit(wq_demo_exit);
下面就分析workqueue组件的源码实现,先从work_queue模块的初始化开始,然后再分析工作的注册过程,最后是工作如何被执行的。
三、workqueu的初始化:在kernel/workqueue.c
1 static DEFINE_SPINLOCK(workqueue_lock); //全局的自旋锁,用于保证对全局链表workqueues的原子操作 2 static LIST_HEAD(workqueues); // 全局链表,用于链接所有的工作队列 3 static struct workqueue_struct *keventd_wq; 4 5 void __init init_workqueues(void) 6 { 7 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 8 9 cpumask_copy(cpu_populated_map, cpu_online_mask); 10 singlethread_cpu = cpumask_first(cpu_possible_mask); 11 cpu_singlethread_map = cpumask_of(singlethread_cpu); 12 hotcpu_notifier(workqueue_cpu_callback, 0); 13 keventd_wq = create_workqueue("events"); 14 BUG_ON(!keventd_wq); 15 }
调用create_workqueue()函数创建了一个工作队列,名字为events。那就再看看工作队列是如何创建的:
上面在介绍创建工作队列的接口时,有看到最终调用的都是__create_workqueue_key()函数的,在介绍这个函数之前,先看看struct workqueue_struct结构体的定义:在kernel/workqueue.c
1 struct workqueue_struct { 2 struct cpu_workqueue_struct *cpu_wq; 3 struct list_head list; // 用于链接到全局链表workqueues 4 const char *name; //工作队列的名字,即内核线程的名字 5 int singlethread; 6 int freezeable; /* Freeze threads during suspend */ 7 int rt; 8 #ifdef CONFIG_LOCKDEP 9 struct lockdep_map lockdep_map; 10 #endif 11 }; 12 13 struct cpu_workqueue_struct { 14 15 spinlock_t lock; // 用于保证worklist链表的原子操作 16 17 struct list_head worklist; //用于保存添加的工作 18 wait_queue_head_t more_work; // 等待队列,当worklist为空时,则会将内核线程挂起,存放与此链表中 19 struct work_struct *current_work; //保存内核线程当前正在执行的工作 20 21 struct workqueue_struct *wq; 22 struct task_struct *thread; // 内核线程 23 } ____cacheline_aligned;
下面看一下创建函数的内部实现,这里要注意我们传递的参数:singlethread = 0, freezeable =0, rt = 0
1 struct workqueue_struct *__create_workqueue_key(const char *name, 2 int singlethread, 3 int freezeable, 4 int rt, 5 struct lock_class_key *key, 6 const char *lock_name) 7 { 8 struct workqueue_struct *wq; 9 struct cpu_workqueue_struct *cwq; 10 int err = 0, cpu; 11 12 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 13 if (!wq) 14 return NULL; 15 16 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 17 if (!wq->cpu_wq) { 18 kfree(wq); 19 return NULL; 20 } 21 22 wq->name = name; 23 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 24 wq->singlethread = singlethread; 25 wq->freezeable = freezeable; 26 wq->rt = rt; 27 INIT_LIST_HEAD(&wq->list); 28 29 if (singlethread) { 30 cwq = init_cpu_workqueue(wq, singlethread_cpu); 31 err = create_workqueue_thread(cwq, singlethread_cpu); 32 start_workqueue_thread(cwq, -1); 33 } else { 34 cpu_maps_update_begin(); 35 /* 36 * We must place this wq on list even if the code below fails. 37 * cpu_down(cpu) can remove cpu from cpu_populated_map before 38 * destroy_workqueue() takes the lock, in that case we leak 39 * cwq[cpu]->thread. 40 */ 41 spin_lock(&workqueue_lock); 42 list_add(&wq->list, &workqueues); 43 spin_unlock(&workqueue_lock); 44 /* 45 * We must initialize cwqs for each possible cpu even if we 46 * are going to call destroy_workqueue() finally. Otherwise 47 * cpu_up() can hit the uninitialized cwq once we drop the 48 * lock. 49 */ 50 for_each_possible_cpu(cpu) { 51 cwq = init_cpu_workqueue(wq, cpu); 52 if (err || !cpu_online(cpu)) 53 continue; 54 err = create_workqueue_thread(cwq, cpu); /*创建内核线程*/ 55 start_workqueue_thread(cwq, cpu); /*启动内核线程*/ 56 } 57 cpu_maps_update_done(); 58 } 59 60 if (err) { 61 destroy_workqueue(wq); 62 wq = NULL; 63 } 64 return wq; 65 }
主要的代码逻辑是创建一个struct workqueue_struct类型的对象,然后将此工作队列加入到workqueues链表中,最后是调用create_workqueue_thread()创建一个内核线程,并启动此线程。
我们知道内核线程最主要的是它的线程函数,那么工作队列的线程函数时什么呢?
1 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 2 { 3 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; 4 struct workqueue_struct *wq = cwq->wq; 5 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; 6 struct task_struct *p; 7 8 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 9 /* 10 * Nobody can add the work_struct to this cwq, 11 * if (caller is __create_workqueue) 12 * nobody should see this wq 13 * else // caller is CPU_UP_PREPARE 14 * cpu is not on cpu_online_map 15 * so we can abort safely. 16 */ 17 if (IS_ERR(p)) 18 return PTR_ERR(p); 19 if (cwq->wq->rt) 20 sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m); 21 cwq->thread = p; 22 23 trace_workqueue_creation(cwq->thread, cpu); 24 25 return 0; 26 } 27 28 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 29 { 30 struct task_struct *p = cwq->thread; 31 32 if (p != NULL) { 33 if (cpu >= 0) 34 kthread_bind(p, cpu); 35 wake_up_process(p); 36 } 37 }
主要就是调用kthread_create()创建内核线程,线程函数为worker_thread,参数为cwq。start_workqueue_thread()函数就是调用wake_up_process()把内核线程加入到run queue中。
下面就分析下线程函数worker_thread到底是怎么我们添加的工作的?
1 static int worker_thread(void *__cwq) 2 { 3 struct cpu_workqueue_struct *cwq = __cwq; 4 DEFINE_WAIT(wait); 5 6 if (cwq->wq->freezeable) 7 set_freezable(); 8 9 for (;;) { 10 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 11 if (!freezing(current) && 12 !kthread_should_stop() && 13 list_empty(&cwq->worklist)) 14 schedule(); //若worklist链表为空,则进行调度 15 finish_wait(&cwq->more_work, &wait); 16 17 try_to_freeze(); 18 19 if (kthread_should_stop()) 20 break; 21 22 run_workqueue(cwq);//执行队列中的工作 23 } 24 25 return 0; 26 }
前面已经有文章分析了内核线程和等待队列waitqueue,了解这个的话,就很容易看懂这段代码,就是判断worklist队列是否为空,如果为空,则将当前内核线程挂起,否则就调用run_workqueue()去执行已添加注册的工作:
1 static void run_workqueue(struct cpu_workqueue_struct *cwq) 2 { 3 spin_lock_irq(&cwq->lock); 4 while (!list_empty(&cwq->worklist)) { 5 struct work_struct *work = list_entry(cwq->worklist.next, 6 struct work_struct, entry); 7 work_func_t f = work->func; 8 #ifdef CONFIG_LOCKDEP 9 /* 10 * It is permissible to free the struct work_struct 11 * from inside the function that is called from it, 12 * this we need to take into account for lockdep too. 13 * To avoid bogus "held lock freed" warnings as well 14 * as problems when looking into work->lockdep_map, 15 * make a copy and use that here. 16 */ 17 struct lockdep_map lockdep_map = work->lockdep_map; 18 #endif 19 trace_workqueue_execution(cwq->thread, work); 20 cwq->current_work = work; //保存当前工作到current_work 21 list_del_init(cwq->worklist.next); // 将此工作从链表中移除 22 spin_unlock_irq(&cwq->lock); 23 24 BUG_ON(get_wq_data(work) != cwq); 25 work_clear_pending(work); 26 lock_map_acquire(&cwq->wq->lockdep_map); 27 lock_map_acquire(&lockdep_map); 28 f(work); //执行工作函数 29 lock_map_release(&lockdep_map); 30 lock_map_release(&cwq->wq->lockdep_map); 31 32 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 33 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 34 "%s/0x%08x/%d ", 35 current->comm, preempt_count(), 36 task_pid_nr(current)); 37 printk(KERN_ERR " last function: "); 38 print_symbol("%s ", (unsigned long)f); 39 debug_show_held_locks(current); 40 dump_stack(); 41 } 42 43 spin_lock_irq(&cwq->lock); 44 cwq->current_work = NULL; 45 } 46 spin_unlock_irq(&cwq->lock); 47 }
上面的注释已经说明清楚代码的逻辑了,这里就不在解释了。
四、添加工作到内核工作队列中:
上面提到了,当工作队列中的worklist链表为空,及没有需要执行的工作,怎会将工作队列所在的内核线程挂起,那么什么时候会将其唤醒呢?肯定就是当有工作添加到链表的时候,即调用schedule_work()的时候:
1 int schedule_work(struct work_struct *work) 2 { 3 return queue_work(keventd_wq, work); // 将工作添加到内核提我们创建好的工作队列中 4 }
前面在初始化的时候,就将内核创建的工作队列保存在keventd_wq变量中。
1 /** 2 * queue_work - queue work on a workqueue 3 * @wq: workqueue to use 4 * @work: work to queue 5 * 6 * Returns 0 if @work was already on a queue, non-zero otherwise. 7 * 8 * We queue the work to the CPU on which it was submitted, but if the CPU dies 9 * it can be processed by another CPU. 10 */ 11 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 12 { 13 int ret; 14 15 ret = queue_work_on(get_cpu(), wq, work); 16 put_cpu(); 17 18 return ret; 19 } 20 21 int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 22 { 23 int ret = 0; 24 25 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 26 BUG_ON(!list_empty(&work->entry)); 27 __queue_work(wq_per_cpu(wq, cpu), work); 28 ret = 1; 29 } 30 return ret; 31 } 32 33 static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) 34 { 35 unsigned long flags; 36 37 spin_lock_irqsave(&cwq->lock, flags); //加锁,保证insert_work原子操作 38 insert_work(cwq, work, &cwq->worklist); 39 spin_unlock_irqrestore(&cwq->lock, flags); //解锁 40 } 41 42 static void insert_work(struct cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head) 43 { 44 trace_workqueue_insertion(cwq->thread, work); 45 46 set_wq_data(work, cwq); 47 /* 48 * Ensure that we get the right work->data if we see the 49 * result of list_add() below, see try_to_grab_pending(). 50 */ 51 smp_wmb(); 52 list_add_tail(&work->entry, head); //加入到worklist链表 53 wake_up(&cwq->more_work); //唤醒在more_work等待链表上的任务,即工作队列线程 54 }
五、使用自定义工作队列:
通过上面的分析,创建 工作队列最基本的接口时create_workqueue()。当我们要把工作放入到自定义的工作队列时,使用如下接口:
1 int queue_work(struct workqueue_struct *wq, struct work_struct *work);
在上面的分析中,其实已经使用了此接口,只不过我们调用schedule_work()的时候,wq参数为内核已创建好的工作队列keventd_wq。