zoukankan      html  css  js  c++  java
  • 工作队列(workqueue) create_workqueue/schedule_work/queue_work

    ---

    项目需要,在驱动模块里用内核计时器timer_list实现了一个状态机。
    郁闷的是,运行时总报错“Scheduling while atomic”,网上搜了一下:

    "Scheduling while atomic" indicates that you've tried to sleep somewhere that you shouldn't - like within a spinlock-protected critical section or an interrupt handler.

    改进程序,在计时器里使用了workqueue,搞定问题。顺便把workqueue的实现代码总结了一下


    一、workqueue简介

    workqueue与tasklet类似,都是允许内核代码请求某个函数在将来的时间被调用(抄《ldd3》上的)
    每个workqueue就是一个内核进程。

    workqueue与tasklet的区别:
       1.tasklet是通过软中断实现的,在软中断上下文中运行,tasklet代码必须是原子的
         workqueue是通过内核进程实现的,就没有上述限制的,最爽的是,工作队列函数可以休眠 
             
         PS: 我的驱动模块就是印在计时器中调用了可休眠函数,所以出现了cheduling while atomic告警
             内核计时器也是通过软中断实现的

       2.tasklet始终运行在被初始提交的同一处理器上,workqueue不一定
       3.tasklet不能确定延时时间(即使很短),workqueue可以设定延迟时间


    二、workqueue的API

     
    workqueue的API自2.6.20后发生了变化

    1. #include <linux/workqueue.h>
    2. struct workqueue_struct;
    3. struct work_struct;
    4. struct workqueue_struct *create_workqueue(const char *name);
    5. void destroy_workqueue(struct workqueue_struct *queue);
    6. INIT_WORK(_work, _func);
    7. INIT_DELAYED_WORK(_work, _func);
    8. int queue_work(struct workqueue_struct *wq, struct work_struct *work);
    9. int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay);
    10. int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
    11.             struct delayed_work *dwork, unsigned long delay);
    12. int cancel_work_sync(struct work_struct *work);
    13. int cancel_delayed_work_sync(struct delayed_work *dwork);
    14. void flush_workqueue(struct workqueue_struct *wq);

    Workqueue编程接口

    序号

    接口函数

    说明

    1

    create_workqueue

    用于创建一个workqueue队列,为系统中的每个CPU都创建一个内核线程。输入参数:

    @name:workqueue的名称

    2

    create_singlethread_workqueue

    用于创建workqueue,只创建一个内核线程。输入参数:

    @name:workqueue名称

    3

    destroy_workqueue

    释放workqueue队列。输入参数:

    @ workqueue_struct:需要释放的workqueue队列指针

    4

    schedule_work

    调度执行一个具体的任务,执行的任务将会被挂入Linux系统提供的workqueue——keventd_wq输入参数:

    @ work_struct:具体任务对象指针

    5

    schedule_delayed_work

    延迟一定时间去执行一个具体的任务,功能与schedule_work类似,多了一个延迟时间,输入参数:

    @work_struct:具体任务对象指针

    @delay:延迟时间

    6

    queue_work

    调度执行一个指定workqueue中的任务。输入参数:

    @ workqueue_struct:指定的workqueue指针

    @work_struct:具体任务对象指针

    7

    queue_delayed_work

    延迟调度执行一个指定workqueue中的任务,功能与queue_work类似,输入参数多了一个delay。



    下面实例是不指定delay时间的workqueue
    (代码基于2.6.24)

    1. struct my_work_stuct{
    2.     int test;
    3.     struct work_stuct save;
    4. };
    5. struct my_work_stuct test_work; 
    6. struct workqueue_struct *test_workqueue;
    7. void do_save(struct work_struct *p_work)
    8. {
    9.     struct my_work_struct *p_test_work = container_of(p_work, struct my_work_stuct, save);
    10.     printk("%d ",p_test_work->test);
    11. }
    12.   
    13. void test_init()
    14. {
    15.     test_workqueue = create_workqueue("test_workqueue");
    16.     if (!test_workqueue)
    17.         panic("Failed to create test_workqueue ");
    18.     INIT_WORK(&(test_work.save), do_save);
    19.     queue_work(test_workqueue, &(test_work.save));
    20. }
    21. void test_destory(void)
    22. {
    23.     if(test_workqueue)
    24.         destroy_workqueue(test_workqueue);
    25. }



    三、workqueue的实现


    工作队列workqueue不是通过软中断实现的,它是通过内核进程实现的



    首先,创建一个workqueue,实际上就是建立一个内核进程

    1. create_workqueue("tap_workqueue")
    2. --> __create_workqueue(“tap_workqueue”, 0, 0)
    3. --> __create_workqueue_key((name), (singlethread), (freezeable), NULL, NULL){
    4.          wq = kzalloc(sizeof(*wq), GFP_KERNEL);
    5.          wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
    6.          wq->name = name;
    7.          wq->singlethread = singlethread;
    8.          wq->freezeable = freezeable;
    9.          INIT_LIST_HEAD(&wq->list);
    10.          for_each_possible_cpu(cpu) {
    11.              cwq = init_cpu_workqueue(wq, cpu);
    12.              err = create_workqueue_thread(cwq, cpu);
    13.              start_workqueue_thread(cwq, cpu);
    14.          }
    15.     }


    create_workqueue_thread 建立了一个内核进程 worker_thread(linux_2_6_24/kernel/workqueue.c)

    1. create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
    2. {
    3.     struct workqueue_struct *wq = cwq->wq;
    4.     const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
    5.     struct task_struct *p;
    6.     p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
    7.     if (IS_ERR(p))
    8.         return PTR_ERR(p);
    9.     cwq->thread = p;
    10.     return 0;
    11. }


    内核进程worker_thread做的事情很简单,死循环而已,不停的执行workqueue上的work_list
    (linux_2_6_24/kernel/workqueue.c)

    1. int worker_thread (void *__cwq)
    2. {
    3.     struct cpu_workqueue_struct *cwq = __cwq;
    4.     /*下面定义等待队列项*/
    5.     DEFINE_WAIT(wait);
    6.     /*下面freezeable一般为0*/
    7.     if (cwq->wq->freezeable)
    8.         set_freezable();
    9.     /*提高优先级别*/
    10.     set_user_nice(current, -5);
    11.     for (;;) {
    12.         /*在cwq->more_work上等待, 若有人调用queue_work,该函数将调用wake_up(&cwq->more_work) 激活本进程*/
    13.         prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
    14.         /*work队列空则切换出去*/
    15.         if (!freezing(current) && !kthread_should_stop() && list_empty(&cwq->worklist))
    16.             schedule();
    17.         /*切换回来则结束等待 说明有人唤醒cwq->more_work上的等待 有work需要处理*/
    18.         finish_wait(&cwq->more_work, &wait);
    19.         /*下面空,因为没有定义电源管理*/
    20.         try_to_freeze();
    21.         if (kthread_should_stop())
    22.             break;
    23.         /*run_workqueue依次处理工作队列上所有的work*/
    24.         run_workqueue(cwq);
    25.     }
    26.     return 0;
    27. }
    28. /*run_workqueue依次处理工作队列上所有的work*/
    29. static void run_workqueue(struct cpu_workqueue_struct *cwq)
    30. {
    31.     spin_lock_irq(&cwq->lock);
    32.     cwq->run_depth++;
    33.     if (cwq->run_depth > 3) {
    34.         /* morton gets to eat his hat */
    35.         printk("%s: recursion depth exceeded: %d ",
    36.             __FUNCTION__, cwq->run_depth);
    37.         dump_stack();
    38.     }
    39.     while (!list_empty(&cwq->worklist)) {
    40.         struct work_struct *work = list_entry(cwq->worklist.next,
    41.                         struct work_struct, entry);
    42.         work_func_t f = work->func;
    43. #ifdef CONFIG_LOCKDEP
    44.         /*
    45.          * It is permissible to free the struct work_struct
    46.          * from inside the function that is called from it,
    47.          * this we need to take into account for lockdep too.
    48.          * To avoid bogus "held lock freed" warnings as well
    49.          * as problems when looking into work->lockdep_map,
    50.          * make a copy and use that here.
    51.          */
    52.         struct lockdep_map lockdep_map = work->lockdep_map;
    53. #endif
    54.         cwq->current_work = work;
    55.         list_del_init(cwq->worklist.next);
    56.         spin_unlock_irq(&cwq->lock);
    57.         BUG_ON(get_wq_data(work) != cwq);
    58.         work_clear_pending(work);
    59.         lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
    60.         lock_acquire(&lockdep_map, 0, 0, 0, 2, _THIS_IP_);
    61.         f(work); /*执行work项中的func*/
    62.        
    63.         lock_release(&lockdep_map, 1, _THIS_IP_);
    64.         lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
    65.         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
    66.             printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
    67.                     "%s/0x%08x/%d ",
    68.                     current->comm, preempt_count(),
    69.                         task_pid_nr(current));
    70.             printk(KERN_ERR " last function: ");
    71.             print_symbol("%s ", (unsigned long)f);
    72.             debug_show_held_locks(current);
    73.             dump_stack();
    74.         }
    75.         spin_lock_irq(&cwq->lock);
    76.         cwq->current_work = NULL;
    77.     }
    78.     cwq->run_depth--;
    79.     spin_unlock_irq(&cwq->lock);
    80. }



    将一个work加入到指定workqueue的work_list中(文件linux_2_6_24/kernel/workqueue.c)

      int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
    1. {
    2.     int ret = 0;
    3.     if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
    4.         BUG_ON(!list_empty(&work->entry));
    5.         __queue_work(wq_per_cpu(wq, get_cpu()), work);
    6.         put_cpu();
    7.         ret = 1;
    8.     }
    9.     return ret;
    10. /* Preempt must be disabled. */
    11. static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
    12. {
    13.     unsigned long flags;
    14.     spin_lock_irqsave(&cwq->lock, flags);
    15.     insert_work(cwq, work, 1);
    16.     spin_unlock_irqrestore(&cwq->lock, flags);
    17. }
    18. static void insert_work(struct cpu_workqueue_struct *cwq,
    19.                 struct work_struct *work, int tail)
    20. {
    21.     set_wq_data(work, cwq);
    22.     /*
    23.      * Ensure that we get the right work->data if we see the
    24.      * result of list_add() below, see try_to_grab_pending().
    25.      */
    26.     smp_wmb();
    27.     if (tail)
    28.         list_add_tail(&work->entry, &cwq->worklist);
    29.     else
    30.         list_add(&work->entry, &cwq->worklist);
    31.     wake_up(&cwq->more_work);
    32. }




    四、共享队列

    其实内核有自己的一个workqueue,叫keventd_wq,这个工作队列也叫做“共享队列”。
    do_basic_setup --> init_workqueues --> create_workqueue("events"); 

    若驱动模块使用的workqueue功能很简单的话,可以使用“共享队列”,不用自己再建一个队列
    使用共享队列,有这样一套API

      1. int schedule_work(struct work_struct *work)
      2. {
      3.     queue_work(keventd_wq, work);
      4. }
      5. int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay)
      6. {
      7.     timer_stats_timer_set_start_info(&dwork->timer);
      8.     return queue_delayed_work(keventd_wq, dwork, delay);
      9. }
      10. void flush_scheduled_work(void)
      11. {
      12.     flush_workqueue(keventd_wq);
      13. }

    --

    原文:http://blog.csdn.net/angle_birds/article/details/8448070

     
  • 相关阅读:
    flex space-between最后一行对齐问题的解决方案
    如何在父级下访问v-slot的值——vuejs
    flex下省略号的问题解决
    Typescript使用字符串联合类型代替枚举类型
    flex三个对齐属性的记忆方式
    JS中的slice()和splice()的区别以及记忆方式
    JS中的call,apply和bind及记忆方式
    Vue 还是 React 还是 Angular ?
    利用ES6的Promise.all实现至少请求多长时间
    .net core <environment> 不起作用
  • 原文地址:https://www.cnblogs.com/Ph-one/p/6222304.html
Copyright © 2011-2022 走看看