zoukankan      html  css  js  c++  java
  • Linux kernel workqueue机制分析

    Linux kernel workqueue机制分析

    在内核编程中,workqueue机制是最常用的异步处理方式。本文主要基于linux kernel 3.10.108的workqueue文档分析其基本原理和使用API。

    概览

    Workqueue(WQ)机制是Linux内核中最常用的异步处理机制。Workqueue机制的主要概念包括:work用于描述放到队列里即将被执行的函数;worker表示一个独立的线程,用于执行异步上下文处理;workqueue用于存放work的队列。
    当workqueue上有work条目时,worker线程被触发来执行work对应的函数。如果有多个work条目在队列,worker会按顺序处理所有work。

    CMWQ概述

    在最初的WQ实现中,多线程WQ(MTWQ)在每个CPU上都有一个worker线程,单线程WQ(STWQ)则总共只有一个worker线程。一个MTWQ的worker个数和CPU核数相同,多年来,MTWQ大量使用使得线程数量大量增加,甚至超过了某些系统对PID空间默认32K的限制。
    尽管MTWQ浪费大量资源,但其提供的并发水平还是不能让人满意。并发的限制在STWQ和MTWQ上都存在,虽然MT相对来说不那么严重。MTWQ在每个CPU上提供了一个上下文执行环境,STWQ则在整个系统提供一个上下文执行环境。work任务需要竞争这些有限的执行环境资源,从而导致死锁等问题。
    并发和资源之间的紧张关系使得一些使用者不得不做出一些不必要的折中,比如libata的polling PIOs选择STWQ,这样就无法有两个polling PIOs同时进行处理。因为MTWQ并不能提供高并发能力,因此async和fscache不得不实现自己的线程池来提供高并发能力。

    Concurrency Managed Workqueue (CMWQ)重新设计了WQ机制,并实现如下目标:

    1. 保持原workqueue API的兼容;
    2. 使用per-CPU统一的worker池,为所有WQ共享使用并提供灵活的并发级别,同时不浪费不必要的资源;
    3. 自动调整worker池和并发级别,让使用者不用关心这些细节。

    CMWQ设计思想

    一个work是一个简单的结构体,保存一个函数指针用于异步执行。任何驱动或者子系统想要一个函数被异步执行,都需要设置一个work指向该函数并将其放入workqueue队列。然后worker线程从队列上获取work并执行对应的函数,如果队列里没有work,则worker线程处于空闲状态。这些worker线程用线程池机制来管理。

    CMWQ设计时将面向用户的workqueue机制和后台worker线程池管理机制进行了区分。后台的workqueue被称为GCWQ(推测可能是Global Concurrency Workqueuq),在每个CPU上存在一个GCWQ,用于处理该CPU上所有workqueue的work。每个GCWQ有两个线程池:一个用于普通work处理,另一个用于高优先级work处理。

    内核子系统和驱动程序通过workqueue API创建和调度work,并可以通过设置flags来指定CPU核心、可重复性、并发限制,优先级等。当work放入workqueue时,通过队列参数和属性决定目标GCWQ和线程池,work最终放入对应线程池的共享worklist上。通过如果没有特别设定,work会被默认放入当前运行的CPU核上的GCWQ线程池的worklist上。

    GCWQ的线程池在实现时同时考虑了并发能力和资源占用,仅可能占用最小的资源并提供足够的并发能力。每个CPU上绑定的线程池通过hook到CPU调度机制来实现并发管理。当worker被唤醒或者进入睡眠都会通知到线程池,线程池保持对当前可以运行的worker个数的跟踪。通常我们不期望一个work独占CPU和运行很多个CPU周期,因此维护刚好足够的并发以防止work处理的速度降低是最优的。当CPU上有一个或多个runnalbe的worker,线程池不会启动新的work任务。当上一个running的work转入睡眠,则立即调度一个新的worker。这样当有work在pending的时候,CPU一直保持干活的状态。这样来保证用最小的worker个数同时足够的执行带宽。

    维持idle状态的worker只是消耗部分kthreads的内存,因此CMWQ在杀掉idle的worker之前一段时间让其活着。

    unbound的WQ并不使用上述机制,而是用pseudo unbound CPU的线程池去尽快处理所有work。CMWQ的使用者来控制并发级别,并可以设置一个flag来忽略并发管理机制。

    CMWQ通过创建更多的worker以及rescue-worker来保证任务按时处理。所有可能在内存回收的代码路径上执行的work必须放到特定的workqueue,该workqueue上有一个rescue-worker可以在内存压力下执行,这样避免在内存回收时出现死锁。

    API

    alloc_workqueue()
    alloc_workqueue()用于分配一个WQ。原来的create_workqueue()系列接口已经弃用并计划删除。alloc_workqueue()有三个入参:@name, @flags, @max_active。name是workqueue的名字并也用于rescuer-thread(如果有的话)名称。flags和max_active用于控制work分配执行环境、调度和执行。

    flags
    WQ_NON_REENTRANT
    默认一个WQ保证在同一个CPU上不会有重入性,即WQ上多个work不会再同一个CPU上并发执行,但会在多个CPU上并发执行。该flag标识在多个CPU上也不能重入,在整个系统级别都只有一个work在执行。

    WQ_UNBOUND
    该flag设定的WQ不绑定到CPU,其work将被一个特殊的CGWQ进行服务,该CGWQ上的worker不绑定任何CPU。unbound WQ牺牲了CPU亲和性,主要用于下场景:

    1. 并发级别需求的波动非常大,如果使用bound WQ则会在不同CPU上创建大量的worker,并且这些worker大部分时间都是空闲的。
    2. 长时间运行的CPU密集型工作可以由系统调度程序更好的管理。

    WQ_FREEZABLE
    可冻结的WQ在系统suspend操作的freeze阶段,暂停新的work执行直到解冻。

    WQ_MEM_RECLAIM
    可能用于内存回收路径的WQ必须设置该flag。在内存紧张的时候也会保证至少有一个可执行的上下文用于该WQ。

    WQ_HIGHPRI
    高优先级的WQ的work会被放入GCWQ的高优先级线程池。高优先级的线程池的线程拥有高nice级别。普通的线程池和高优先级的线程池彼此独立,互相不影响。

    WQ_CPU_INTENSIVE
    设置为CPU密集型的WQ的work不会影响并发级别,即CPU密集型的work执行时并不会阻止同一个线程池里其他WQ的work的执行。这对希望独占CPU周期的work非常有用,由系统调度程序调度他们的执行。如果不设置该标记,则独占CPU周期的work会导致同一个线程池里其他WQ的work得不到执行。
    由于同一由CMWQ的并发管理进行调度,当非密集型的WQ的work运行过程中,也会导致密集型的WQ的work被推迟。该flag仅适用于bound的WQ,对unbound的WQ无效。

    max_active
    max_active用于指定WQ在每个CPU上最大的执行上下文个数,即并发处理的work个数。目前对于bound WQ,max_active最大可以设置为512,如果max_active入参为0,则使用默认值256。对于unbound WQ,最大值为512和4*cpu核数两个里面较大的值。
    对于希望使用STWQ的使用者,可以设置max_active为1,并且设置WQ_UNBOUND标识。这样整个系统里只有一个该WQ上的work正在执行。

    struct workqueue_struct
    函数alloc_workqueue()返回一个指向struct workqueue_struct的指针,代表一个workqueue。如下所示:

    struct workqueue_struct *wq = alloc_workqueue("wq-name", WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
    

    struct work_struct
    数据结构struct work_struct定义了一个work,通过通过INIT_WORK系列宏定义初始化work,设置执行的函数。如下所示:

    struct work_struct work;
    void worker_func(struct work_struct *work);
    INIT_WORK(&work, worker_func);
    

    struct delayed_work
    数据结构struct delayed_work定义了一个延迟work,延迟work通过设置定时器的方式延迟将work放入队列。如下所示为其数据结构定义:

    struct delayed_work {
    	struct work_struct work;
    	struct timer_list timer;
    
    	/* target workqueue and CPU ->timer uses to queue ->work */
    	struct workqueue_struct *wq;
    	int cpu;
    };
    

    可以看到delayed_work由一个work和一个定时器组成。delayed_work通过INIT_DELAYED_WORK系列宏定义进行初始化。如下所示:

    struct delayed_work work;
    void worker_func(struct work_struct *work);
    INIT_DELAYED_WORK(&work, worker_func);
    

    queue_work_on()
    函数queue_work_on()将work放入workqueue队列,其定义如下:

    extern bool queue_work_on(int cpu, struct workqueue_struct *wq,
    			struct work_struct *work);
    

    函数queue_delayed_work将delayed_work在延迟delay个jiffies之后放入workqueue队列,其定义如下:

    static inline bool queue_delayed_work(struct workqueue_struct *wq,
    				      struct delayed_work *dwork,
    				      unsigned long delay)
    

    调试

    因为work是由通用的工作线程执行的,因此需要一些技巧来定位workqueue使用者的一些错误行为。
    worker线程通过ps可以看到:

    root      5671  0.0  0.0      0     0 ?        S    12:07   0:00 [kworker/0:1]
    root      5672  0.0  0.0      0     0 ?        S    12:07   0:00 [kworker/1:2]
    root      5673  0.0  0.0      0     0 ?        S    12:12   0:00 [kworker/0:0]
    root      5674  0.0  0.0      0     0 ?        S    12:13   0:00 [kworker/1:0]
    

    如果某个kworker疯了,占用CPU非常高,可能有如下两种原因:

    1. 大量的work正在正在提交调度;
    2. 某个work占用过多CPU;

    第1种原因可以通过tracing机制来跟踪:

    $ echo workqueue:workqueue_queue_work > /sys/kernel/debug/tracing/set_event
    $ cat /sys/kernel/debug/tracing/trace_pipe > out.txt
    

    如果某个worker忙于循环将大量work进行调度,通过输出的work里的函数可以找到谁提交了大量的work。

    第2种原因可以通过打印出worker的堆栈空间来分析是哪个work的函数正在处理:

    $ cat /proc/THE_OFFENDING_KWORKER/stack
    

    总结

    在内核中直接使用kthread创建自己的线程进行异步处理带来一定的复杂度以及资源浪费,而workqueue机制为内核模块提供了简单的接口来实现异步函数处理。CMWQ机制在使用尽量少的资源的同时保证了并发处理能力。

    参考资料

    Linux/Documentation/workqueue.txt

  • 相关阅读:
    leetcode 131. Palindrome Partitioning
    leetcode 526. Beautiful Arrangement
    poj 1852 Ants
    leetcode 1219. Path with Maximum Gold
    leetcode 66. Plus One
    leetcode 43. Multiply Strings
    pytorch中torch.narrow()函数
    pytorch中的torch.repeat()函数与numpy.tile()
    leetcode 1051. Height Checker
    leetcode 561. Array Partition I
  • 原文地址:https://www.cnblogs.com/jimbo17/p/8885814.html
Copyright © 2011-2022 走看看