zoukankan      html  css  js  c++  java
  • 内核工作队列【转】

    本文转载自:https://my.oschina.net/kaedehao/blog/631394

    workqueue作为内核的重要基础组件,在内核中被广泛的使用,通过工作队列,可以很方便的把我们要执行的某个任务(即函数+上下文参数)交代给内核,由内核替我们执行。本文主要是介绍工作队列的使用,及其内部实现的逻辑。

        因为内核在系统初始化的时候,已为我们创建好了默认的工作队列,所以我们在使用的时候,可以不需要再创建工作队列,只需要创建工作,并将工作添加到工作队列上即可。当然,我们也可以创建自己的工作队列,而不使用内核创建好的工作队列。

        简单的理解,工作队列是由内核线程+链表+等待队列来实现的,即由一个内核线程不断的从链表中读取工作,然后执行工作的工作函数!

        一、工作的表示: struct work_struct

    1 typedef void (*work_func_t)(struct work_struct *work);
    2 struct work_struct {
    3     atomic_long_t data; /* 内核内部使用 */
    4     struct list_head entry; /* 用于链接到工作队列中*/
    5     work_func_t func; /* 工作函数*/
    6 #ifdef CONFIG_LOCKDEP
    7     struct lockdep_map lockdep_map;
    8 #endif
    9 };

            从这个结构体可以看出,一个工作,就是一个待执行的工作函数,那如果我们要给工作函数传递参数,怎么解决呢?

        仔细观察工作函数的格式:参数是work_struct,所以在实际使用的时候,经常会在创建我们自己工作的时候,将此结构体嵌套在内部。然后在work_func函数内部通过container_of来得到我们自定义的工作,这样子就完成参数的传递了。

    二、工作队列的常用接口: 在linux/kernel/workqueue.h

    1. 初始化工作:

     1 #define __DELAYED_WORK_INITIALIZER(n, f) {          
     2     .work = __WORK_INITIALIZER((n).work, (f)),      
     3     .timer = TIMER_INITIALIZER(NULL, 0, 0),         
     4     }
     5 
     6 #define DECLARE_WORK(n, f)                  
     7     struct work_struct n = __WORK_INITIALIZER(n, f)
     8     
     9 // 也可以使用INIT_WORK宏:
    10 #define INIT_WORK(_work, _func)                     
    11     do {                                
    12         (_work)->data = (atomic_long_t) WORK_DATA_INIT();   
    13         INIT_LIST_HEAD(&(_work)->entry);            
    14         PREPARE_WORK((_work), (_func));             
    15     } while (0)

        主要是完成func成员的赋值。

        2. 工作入队: 添加到内核工作队列

    1 int schedule_work(struct work_struct *work);

        3. 工作撤销: 从内核工作队列中删除

    1 int cancel_work_sync(struct work_struct *work);

        4. 创建工作队列:

     1 #ifdef CONFIG_LOCKDEP
     2 #define __create_workqueue(name, singlethread, freezeable, rt)  
     3 ({                              
     4     static struct lock_class_key __key;         
     5     const char *__lock_name;                
     6                                 
     7     if (__builtin_constant_p(name))             
     8         __lock_name = (name);               
     9     else                            
    10         __lock_name = #name;                
    11                                 
    12     __create_workqueue_key((name), (singlethread),      
    13                    (freezeable), (rt), &__key,  
    14                    __lock_name);            
    15 })
    16 #else
    17 #define __create_workqueue(name, singlethread, freezeable, rt)  
    18     __create_workqueue_key((name), (singlethread), (freezeable), (rt), 
    19                    NULL, NULL)
    20 #endif
    21 
    22 #define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
    23 #define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
    24 #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
    25 #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
    26     创建工作队列,最终调用的都是__create_workqueue_key()函数来完成,此函数返回的是struct workqueue_struct *,用于表示一个工作队列。
    27 
    28     5. 销毁工作队列:
    29 
    30 void destroy_workqueue(struct workqueue_struct *wq);
    31  
    32 
    33     在介绍了上面的接口后,看一个简单的使用例子,这个例子使用的是内核已创建好的工作队列,要使用自己创建的工作队列,也是很简单的,看了后面的实现源码分析就清楚了。
    34 
    35 #include <linux/module.h>
    36 #include <linux/delay.h>
    37 #include <linux/workqueue.h>
    38 
    39 #define ENTER() printk(KERN_DEBUG "%s() Enter", __func__)
    40 #define EXIT() printk(KERN_DEBUG "%s() Exit", __func__)
    41 #define ERR(fmt, args...) printk(KERN_ERR "%s()-%d: " fmt "
    ", __func__, __LINE__, ##args)
    42 #define DBG(fmt, args...) printk(KERN_DEBUG "%s()-%d: " fmt "
    ", __func__, __LINE__, ##args)
    43 
    44 struct test_work {
    45     struct work_struct w;
    46     unsigned long data;
    47 };
    48 
    49 static struct test_work my_work;
    50 
    51 static void my_work_func(struct work_struct *work)
    52 {
    53     struct test_work *p_work;
    54     ENTER();
    55     p_work = container_of(work, struct test_work, w);
    56     while (p_work->data) {
    57         DBG("data: %lu", p_work->data--);
    58         msleep_interruptible(1000);
    59     }
    60 
    61     EXIT();
    62 }
    63 
    64 static int __init wq_demo_init(void)
    65 {
    66     INIT_WORK(&my_work.w, my_work_func);
    67     my_work.data = 30;
    68 
    69     msleep_interruptible(1000);
    70     DBG("schedule work begin:");
    71     if (schedule_work(&my_work.w) == 0) {
    72         ERR("schedule work fail");
    73         return -1;
    74     }
    75 
    76     DBG("success");
    77     return 0;
    78 }
    79 
    80 static void __exit wq_demo_exit(void)
    81 {
    82     ENTER();
    83     while (my_work.data) {
    84         DBG("waiting exit");
    85         msleep_interruptible(2000);
    86     }
    87     EXIT();
    88 }
    89 
    90 MODULE_LICENSE("GPL");
    91 module_init(wq_demo_init);
    92 module_exit(wq_demo_exit);

        下面就分析workqueue组件的源码实现,先从work_queue模块的初始化开始,然后再分析工作的注册过程,最后是工作如何被执行的。

    三、workqueu的初始化:在kernel/workqueue.c

     1 static DEFINE_SPINLOCK(workqueue_lock);  //全局的自旋锁,用于保证对全局链表workqueues的原子操作
     2 static LIST_HEAD(workqueues); // 全局链表,用于链接所有的工作队列
     3 static struct workqueue_struct *keventd_wq;
     4 
     5 void __init init_workqueues(void)
     6 {
     7     alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
     8 
     9     cpumask_copy(cpu_populated_map, cpu_online_mask);
    10     singlethread_cpu = cpumask_first(cpu_possible_mask);
    11     cpu_singlethread_map = cpumask_of(singlethread_cpu);
    12     hotcpu_notifier(workqueue_cpu_callback, 0);
    13     keventd_wq = create_workqueue("events");                                                                                              
    14     BUG_ON(!keventd_wq);
    15 }

        调用create_workqueue()函数创建了一个工作队列,名字为events。那就再看看工作队列是如何创建的:

        上面在介绍创建工作队列的接口时,有看到最终调用的都是__create_workqueue_key()函数的,在介绍这个函数之前,先看看struct workqueue_struct结构体的定义:在kernel/workqueue.c

     1 struct workqueue_struct {                                                                                                                 
     2     struct cpu_workqueue_struct *cpu_wq;
     3     struct list_head list; // 用于链接到全局链表workqueues
     4     const char *name; //工作队列的名字,即内核线程的名字
     5     int singlethread;
     6     int freezeable;     /* Freeze threads during suspend */
     7     int rt;
     8 #ifdef CONFIG_LOCKDEP
     9     struct lockdep_map lockdep_map;
    10 #endif
    11 };
    12 
    13 struct cpu_workqueue_struct {
    14 
    15     spinlock_t lock; // 用于保证worklist链表的原子操作
    16 
    17     struct list_head worklist;  //用于保存添加的工作
    18     wait_queue_head_t more_work; // 等待队列,当worklist为空时,则会将内核线程挂起,存放与此链表中
    19     struct work_struct *current_work;  //保存内核线程当前正在执行的工作
    20 
    21     struct workqueue_struct *wq;
    22     struct task_struct *thread; // 内核线程
    23 } ____cacheline_aligned;

        下面看一下创建函数的内部实现,这里要注意我们传递的参数:singlethread = 0, freezeable =0, rt = 0

     1 struct workqueue_struct *__create_workqueue_key(const char *name,
     2                         int singlethread,
     3                         int freezeable,
     4                         int rt,
     5                         struct lock_class_key *key,
     6                         const char *lock_name)
     7 {
     8     struct workqueue_struct *wq;
     9     struct cpu_workqueue_struct *cwq;
    10     int err = 0, cpu;
    11 
    12     wq = kzalloc(sizeof(*wq), GFP_KERNEL);
    13     if (!wq)
    14         return NULL;
    15 
    16     wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
    17     if (!wq->cpu_wq) {
    18         kfree(wq);
    19         return NULL;
    20     }
    21 
    22     wq->name = name;
    23     lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
    24     wq->singlethread = singlethread;
    25     wq->freezeable = freezeable;
    26     wq->rt = rt;
    27     INIT_LIST_HEAD(&wq->list);
    28 
    29     if (singlethread) {
    30         cwq = init_cpu_workqueue(wq, singlethread_cpu);
    31         err = create_workqueue_thread(cwq, singlethread_cpu); 
    32         start_workqueue_thread(cwq, -1);
    33     } else {
    34         cpu_maps_update_begin();
    35         /*
    36          * We must place this wq on list even if the code below fails.
    37          * cpu_down(cpu) can remove cpu from cpu_populated_map before
    38          * destroy_workqueue() takes the lock, in that case we leak
    39          * cwq[cpu]->thread.
    40          */
    41         spin_lock(&workqueue_lock);
    42         list_add(&wq->list, &workqueues);
    43         spin_unlock(&workqueue_lock);
    44         /*
    45          * We must initialize cwqs for each possible cpu even if we
    46          * are going to call destroy_workqueue() finally. Otherwise
    47          * cpu_up() can hit the uninitialized cwq once we drop the
    48          * lock.
    49          */
    50         for_each_possible_cpu(cpu) {
    51             cwq = init_cpu_workqueue(wq, cpu);
    52             if (err || !cpu_online(cpu))
    53                 continue;
    54             err = create_workqueue_thread(cwq, cpu); /*创建内核线程*/
    55             start_workqueue_thread(cwq, cpu); /*启动内核线程*/
    56         }
    57         cpu_maps_update_done();
    58     }
    59 
    60     if (err) {
    61         destroy_workqueue(wq);
    62         wq = NULL;
    63     }
    64     return wq;
    65 }

            主要的代码逻辑是创建一个struct workqueue_struct类型的对象,然后将此工作队列加入到workqueues链表中,最后是调用create_workqueue_thread()创建一个内核线程,并启动此线程。

        我们知道内核线程最主要的是它的线程函数,那么工作队列的线程函数时什么呢?

     1 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
     2 {
     3     struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
     4     struct workqueue_struct *wq = cwq->wq;
     5     const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
     6     struct task_struct *p;
     7     
     8     p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
     9     /*
    10      * Nobody can add the work_struct to this cwq,
    11      *  if (caller is __create_workqueue)
    12      *      nobody should see this wq
    13      *  else // caller is CPU_UP_PREPARE
    14      *      cpu is not on cpu_online_map
    15      * so we can abort safely.
    16      */
    17     if (IS_ERR(p))
    18         return PTR_ERR(p);
    19     if (cwq->wq->rt)
    20         sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
    21     cwq->thread = p;
    22     
    23     trace_workqueue_creation(cwq->thread, cpu);
    24     
    25     return 0;
    26 }
    27 
    28 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
    29 {
    30     struct task_struct *p = cwq->thread;
    31 
    32     if (p != NULL) {
    33         if (cpu >= 0)
    34             kthread_bind(p, cpu);
    35         wake_up_process(p);
    36     }
    37 }

            主要就是调用kthread_create()创建内核线程,线程函数为worker_thread,参数为cwq。start_workqueue_thread()函数就是调用wake_up_process()把内核线程加入到run queue中。

        下面就分析下线程函数worker_thread到底是怎么我们添加的工作的?

     1 static int worker_thread(void *__cwq)
     2 {
     3     struct cpu_workqueue_struct *cwq = __cwq;
     4     DEFINE_WAIT(wait);
     5 
     6     if (cwq->wq->freezeable)
     7         set_freezable();
     8 
     9     for (;;) {
    10         prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
    11         if (!freezing(current) &&
    12             !kthread_should_stop() &&
    13             list_empty(&cwq->worklist))
    14             schedule(); //若worklist链表为空,则进行调度
    15         finish_wait(&cwq->more_work, &wait);
    16 
    17         try_to_freeze();
    18 
    19         if (kthread_should_stop())
    20             break;
    21 
    22         run_workqueue(cwq);//执行队列中的工作
    23     }
    24 
    25     return 0;
    26 }

            前面已经有文章分析了内核线程和等待队列waitqueue,了解这个的话,就很容易看懂这段代码,就是判断worklist队列是否为空,如果为空,则将当前内核线程挂起,否则就调用run_workqueue()去执行已添加注册的工作:

     1 static void run_workqueue(struct cpu_workqueue_struct *cwq)
     2 {
     3     spin_lock_irq(&cwq->lock);
     4     while (!list_empty(&cwq->worklist)) {
     5         struct work_struct *work = list_entry(cwq->worklist.next,
     6                         struct work_struct, entry);
     7         work_func_t f = work->func;
     8 #ifdef CONFIG_LOCKDEP
     9         /*
    10          * It is permissible to free the struct work_struct
    11          * from inside the function that is called from it,
    12          * this we need to take into account for lockdep too.
    13          * To avoid bogus "held lock freed" warnings as well
    14          * as problems when looking into work->lockdep_map,
    15          * make a copy and use that here.
    16          */
    17         struct lockdep_map lockdep_map = work->lockdep_map;
    18 #endif
    19         trace_workqueue_execution(cwq->thread, work);
    20         cwq->current_work = work; //保存当前工作到current_work
    21         list_del_init(cwq->worklist.next); // 将此工作从链表中移除
    22         spin_unlock_irq(&cwq->lock);
    23 
    24         BUG_ON(get_wq_data(work) != cwq);
    25         work_clear_pending(work);
    26         lock_map_acquire(&cwq->wq->lockdep_map);
    27         lock_map_acquire(&lockdep_map);
    28         f(work); //执行工作函数
    29         lock_map_release(&lockdep_map);
    30         lock_map_release(&cwq->wq->lockdep_map);
    31 
    32         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
    33             printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
    34                     "%s/0x%08x/%d
    ",
    35                     current->comm, preempt_count(),
    36                         task_pid_nr(current));
    37             printk(KERN_ERR "    last function: ");
    38             print_symbol("%s
    ", (unsigned long)f);
    39             debug_show_held_locks(current);
    40             dump_stack();
    41         }
    42 
    43         spin_lock_irq(&cwq->lock);
    44         cwq->current_work = NULL;
    45     }
    46     spin_unlock_irq(&cwq->lock);
    47 }

        上面的注释已经说明清楚代码的逻辑了,这里就不在解释了。

    四、添加工作到内核工作队列中:

        上面提到了,当工作队列中的worklist链表为空,及没有需要执行的工作,怎会将工作队列所在的内核线程挂起,那么什么时候会将其唤醒呢?肯定就是当有工作添加到链表的时候,即调用schedule_work()的时候:

    1 int schedule_work(struct work_struct *work)
    2 {
    3     return queue_work(keventd_wq, work); // 将工作添加到内核提我们创建好的工作队列中
    4 }

        前面在初始化的时候,就将内核创建的工作队列保存在keventd_wq变量中。

     1 /**
     2  * queue_work - queue work on a workqueue
     3  * @wq: workqueue to use
     4  * @work: work to queue
     5  *
     6  * Returns 0 if @work was already on a queue, non-zero otherwise.
     7  *
     8  * We queue the work to the CPU on which it was submitted, but if the CPU dies
     9  * it can be processed by another CPU.
    10  */
    11 int queue_work(struct workqueue_struct *wq, struct work_struct *work)                                                                     
    12 {
    13     int ret;
    14 
    15     ret = queue_work_on(get_cpu(), wq, work);
    16     put_cpu();
    17 
    18     return ret;
    19 }
    20 
    21 int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
    22 {
    23     int ret = 0;
    24 
    25     if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
    26         BUG_ON(!list_empty(&work->entry));
    27         __queue_work(wq_per_cpu(wq, cpu), work);
    28         ret = 1;
    29     }
    30     return ret;
    31 }
    32 
    33 static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
    34 {
    35     unsigned long flags;
    36 
    37     spin_lock_irqsave(&cwq->lock, flags); //加锁,保证insert_work原子操作
    38     insert_work(cwq, work, &cwq->worklist);
    39     spin_unlock_irqrestore(&cwq->lock, flags); //解锁
    40 }
    41 
    42 static void insert_work(struct cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head)
    43 {
    44     trace_workqueue_insertion(cwq->thread, work);
    45     
    46     set_wq_data(work, cwq);
    47     /*
    48      * Ensure that we get the right work->data if we see the
    49      * result of list_add() below, see try_to_grab_pending().
    50      */
    51     smp_wmb();
    52     list_add_tail(&work->entry, head); //加入到worklist链表
    53     wake_up(&cwq->more_work); //唤醒在more_work等待链表上的任务,即工作队列线程
    54 }

    五、使用自定义工作队列:

        通过上面的分析,创建 工作队列最基本的接口时create_workqueue()。当我们要把工作放入到自定义的工作队列时,使用如下接口:

    1 int queue_work(struct workqueue_struct *wq, struct work_struct *work);

        在上面的分析中,其实已经使用了此接口,只不过我们调用schedule_work()的时候,wq参数为内核已创建好的工作队列keventd_wq。

  • 相关阅读:
    PyCharm常用快捷键
    在PyCharm中打开文件的位置
    使用Socket下载图片
    Python散列类型和运算符
    Python格式化输出和深浅复制
    爬虫的概念和会话
    Python数值类型和序列类型
    HTTP与HTTPS
    PyCharm彻底删除项目
    PyCharm永久激活
  • 原文地址:https://www.cnblogs.com/zzb-Dream-90Time/p/6550396.html
Copyright © 2011-2022 走看看