zoukankan      html  css  js  c++  java
  • 聊一聊Linux中的工作队列2

    上一篇文章对工作队列原理以及核心数据结构做了简单介绍,本文重点介绍下workqueue的创建以及worker的管理。


     一、工作队列的创建(__alloc_workqueue_key)

    struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
                               unsigned int flags,
                               int max_active,
                               struct lock_class_key *key,
                               const char *lock_name, ...)
    {
        size_t tbl_size = 0;
        va_list args;
        struct workqueue_struct *wq;
        struct pool_workqueue *pwq;
    
        /* allocate wq and format name */
        if (flags & WQ_UNBOUND)
            tbl_size = wq_numa_tbl_len * sizeof(wq->numa_pwq_tbl[0]);
        /*分配workqueue_struct结构*/
        wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
        if (!wq)
            return NULL;
    
        if (flags & WQ_UNBOUND) {
            wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
            if (!wq->unbound_attrs)
                goto err_free_wq;
        }
        /*格式化名称*/
        va_start(args, lock_name);
        vsnprintf(wq->name, sizeof(wq->name), fmt, args);
        va_end(args);
    
        max_active = max_active ?: WQ_DFL_ACTIVE;
        max_active = wq_clamp_max_active(max_active, flags, wq->name);
    
        /* init wq */
        wq->flags = flags;
        wq->saved_max_active = max_active;
        mutex_init(&wq->mutex);
        atomic_set(&wq->nr_pwqs_to_flush, 0);
        INIT_LIST_HEAD(&wq->pwqs);
        INIT_LIST_HEAD(&wq->flusher_queue);
        INIT_LIST_HEAD(&wq->flusher_overflow);
        INIT_LIST_HEAD(&wq->maydays);
    
        lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
        INIT_LIST_HEAD(&wq->list);
        if (alloc_and_link_pwqs(wq) < 0)
            goto err_free_wq;
        /*
         * Workqueues which may be used during memory reclaim should
         * have a rescuer to guarantee forward progress.
         */
        if (flags & WQ_MEM_RECLAIM) {
            struct worker *rescuer;
    
            rescuer = alloc_worker();
            if (!rescuer)
                goto err_destroy;
    
            rescuer->rescue_wq = wq;
            rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
                               wq->name);
            if (IS_ERR(rescuer->task)) {
                kfree(rescuer);
                goto err_destroy;
            }
    
            wq->rescuer = rescuer;
            rescuer->task->flags |= PF_NO_SETAFFINITY;
            wake_up_process(rescuer->task);
        }
    
        if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
            goto err_destroy;
        /*
         * wq_pool_mutex protects global freeze state and workqueues list.
         * Grab it, adjust max_active and add the new @wq to workqueues
         * list.
         */
        mutex_lock(&wq_pool_mutex);
        mutex_lock(&wq->mutex);
        for_each_pwq(pwq, wq)
            pwq_adjust_max_active(pwq);
        mutex_unlock(&wq->mutex);
        list_add(&wq->list, &workqueues);
        mutex_unlock(&wq_pool_mutex);
        return wq;
    err_free_wq:
        free_workqueue_attrs(wq->unbound_attrs);
        kfree(wq);
        return NULL;
    err_destroy:
        destroy_workqueue(wq);
        return NULL;
    }

     该函数主要任务就是通过kzalloc分配一个workqueue_struct结构,然后格式化一个名称,对workqueue进行简单初始化;’接着就调用 和pwd建立关系。我们暂且不考虑WQ_MEM_RECLAIM的情况,那么该函数主要就完成这两个功能。所有的workqueue会链接成一个链表,链表头是 一个全局静态变量

    static LIST_HEAD(workqueues);        /* PL: list of all workqueues */

    本函数比较重要的就是和pwq建立关系了

    static int alloc_and_link_pwqs(struct workqueue_struct *wq)
    {
        bool highpri = wq->flags & WQ_HIGHPRI;
        int cpu;
        /*如果是普通的work_queue*/
        if (!(wq->flags & WQ_UNBOUND)) {
            /*为每个CPU 分配pool_workqueue--pwq*/
            wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
            if (!wq->cpu_pwqs)
                return -ENOMEM;
            /*把pwd和wq链接*/
            for_each_possible_cpu(cpu) {
                struct pool_workqueue *pwq =
                    per_cpu_ptr(wq->cpu_pwqs, cpu);
                struct worker_pool *cpu_pools =
                    per_cpu(cpu_worker_pools, cpu);
    
                init_pwq(pwq, wq, &cpu_pools[highpri]);
    
                mutex_lock(&wq->mutex);
                link_pwq(pwq);
                mutex_unlock(&wq->mutex);
            }
            return 0;
        } else {
            return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
        }
    }

    这里先知考虑普通的workqueue,不考虑WQ_UNBOUND。函数通过alloc_percpu为workqueue分配了pool_workqueue变量,然后通过for_each_possible_cpu,对每个CPU进行处理,实际上就是把对应的pool_workqueue和worker_pool通过init_pwq关联起来。如上一篇文章所描述的,worker_pool分为两种:普通的和高优先级的。普通的为第0项,而高优先级的为第一项。建立关联后在通过link_pwq把pwq接入wq的链表中。

    二、worker的创建

    在创建好workqueue和对应的pwq以及worker_pool后,需要显示的为worker_pool创建worker。核心函数为create_and_start_worker

    static int create_and_start_worker(struct worker_pool *pool)
    {
        struct worker *worker;
    
        mutex_lock(&pool->manager_mutex);
        /*创建一个属于pool的worker,实际上是创建一个线程*/
        worker = create_worker(pool);
        if (worker) {
            spin_lock_irq(&pool->lock);
            /*启动worker,即唤醒线程*/
            start_worker(worker);
            spin_unlock_irq(&pool->lock);
        }
    
        mutex_unlock(&pool->manager_mutex);
        return worker ? 0 : -ENOMEM;
    }

    注意这里是针对worker_pool创建worker,所以worker_pool作为参数传递进来,而具体执行创建任务的是create_worker函数,且由于有专门的worker manager,故这里给worker_pool增加worker需要加锁。

    create_worker函数其实也不复杂,核心任务主要包含以下几个步骤:

    • 通过alloc_worker分配一个worker结构,并执行简单的初始化
    • 在worker和worker_pool之间建立联系
    • 通过kthread_create_on_node创建工作线程,处理函数为worker_thread
    • 设置线程优先级

    初始状态下是为每个worker_pool创建一个worker。创建好之后通过start_worker启动worker

    static void start_worker(struct worker *worker)
    {
        worker->flags |= WORKER_STARTED;
        worker->pool->nr_workers++;
        worker_enter_idle(worker);
        wake_up_process(worker->task);
    }

     该函数较简单,首先就更新worker状态为WORKER_STARTED,增加pool中worker统计量;然后通过worker_enter_idle标记worker目前处于idle状态;最后通过wake_up_process唤醒worker。我们看下中间设置idle状态的过程

    static void worker_enter_idle(struct worker *worker)
    {
        struct worker_pool *pool = worker->pool;
    
        if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
            WARN_ON_ONCE(!list_empty(&worker->entry) &&
                 (worker->hentry.next || worker->hentry.pprev)))
            return;
    
        /* can't use worker_set_flags(), also called from start_worker() */
        worker->flags |= WORKER_IDLE;
        pool->nr_idle++;
        worker->last_active = jiffies;
    
        /* idle_list is LIFO */
        list_add(&worker->entry, &pool->idle_list);
    
        if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
            mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT);
    
        /*
         * Sanity check nr_running.  Because wq_unbind_fn() releases
         * pool->lock between setting %WORKER_UNBOUND and zapping
         * nr_running, the warning may trigger spuriously.  Check iff
         * unbind is not in progress.
         */
        WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
                 pool->nr_workers == pool->nr_idle &&
                 atomic_read(&pool->nr_running));
    }

     该函数会设置WORKER_IDLE,递增pool的nr_idle计数,然后更新last_active为当前jiffies。接着把worker挂入pool的idle_list的链表头.默认状态下,一个worker在idle状态停留的最长时IDLE_WORKER_TIMEOUT,超过这个时间就要启用管理工作。这里的last_active便是纪录进入idle状态的时间,

    三、worker的管理

    系统中会根据实际对worker的需要,动态的增删worker。针对idle worker,worker_pool中有个定时器idle_timer,其处理函数为idle_worker_timeout,看下该处理函数

    static void idle_worker_timeout(unsigned long __pool)
    {
        struct worker_pool *pool = (void *)__pool;
    
        spin_lock_irq(&pool->lock);
    
        if (too_many_workers(pool)) {
            struct worker *worker;
            unsigned long expires;
    
            /* idle_list is kept in LIFO order, check the last one ,即最先挂入链表的*/
            worker = list_entry(pool->idle_list.prev, struct worker, entry);
            expires = worker->last_active + IDLE_WORKER_TIMEOUT;
            /*idleworker每次最多保持idle状态IDLE_WORKER_TIMEOU,当定时器到期时进行检查,如果还未到最长时间,则延迟定时器,否则
            *对pool设置管理标志,唤醒线程进行管理
            */
            if (time_before(jiffies, expires))
                mod_timer(&pool->idle_timer, expires);//重置到期时间
            else {
                /* it's been idle for too long, wake up manager */
                pool->flags |= POOL_MANAGE_WORKERS;
                wake_up_worker(pool);
            }
        }
    
        spin_unlock_irq(&pool->lock);
    }

    该函数主要是针对系统中出现太多worker的情况进行处理,如何判定worker太多呢?too_many_workers去完成,具体就是 nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy决定,其中MAX_IDLE_WORKERS_RATIO为4。当的确idle worker太多了的时候,取最先挂入idle链表中的worker,判定其处于idle状态的时间是否超时,即超过IDLE_WORKER_TIMEOUT,如果没有超时,则通过mod_timer修改定时器到期时间为该定时器对应的最长idle时间,否则设置pool的POOL_MANAGE_WORKERS状态,唤醒pool中的first worker执行管理工作。在worker的处理函数worker_thread中,通过need_more_worker判断当前是否需要更多的worker,如果不需要,则直接goto到sleep

    sleep:
        if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker))
            goto recheck;

    need_to_manage_workers就是判断POOL_MANAGE_WORKERS,如果设置了该标志则返回真。 管理worker的核心在manage_workers,其中只有两个关键函数

        ret |= maybe_destroy_workers(pool);
        ret |= maybe_create_worker(pool);

    我们只看maybe_destroy_workers

    static bool maybe_destroy_workers(struct worker_pool *pool)
    {
        bool ret = false;
    
        while (too_many_workers(pool)) {
            struct worker *worker;
            unsigned long expires;
    
            worker = list_entry(pool->idle_list.prev, struct worker, entry);
            expires = worker->last_active + IDLE_WORKER_TIMEOUT;
    
            if (time_before(jiffies, expires)) {
                mod_timer(&pool->idle_timer, expires);
                break;
            }
            /*删除最先挂入链表的worker*/
            destroy_worker(worker);
            ret = true;
        }
        return ret;
    }

    该函数中会在此通过too_many_workers判断是否有太多worker,如果是,则再次取最后一个worker,检查idle时间,如果没有超时,则修改定时器到期时间,否则通过destroy_worker销毁worker。为什么要这样判断呢?通过对代码的分析,我感觉manage_work不仅负责删除,还负责增加worker,定时器主要是针对idle worker即目的是销毁多余的worker,但是执行管理任务的工作集成到了worker_thread中,因此就worker_thread而言,有可能需要增加、有可能需要删除、还有可能不需要管理。因此这里需要再次判断。

    四、work的添加

    static inline bool schedule_work(struct work_struct *work)
    {
        return queue_work(system_wq, work);
    }
    
    static inline bool queue_work(struct workqueue_struct *wq,
                      struct work_struct *work)
    {
        return queue_work_on(WORK_CPU_UNBOUND, wq, work);
    }
    bool queue_work_on(int cpu, struct workqueue_struct *wq,
               struct work_struct *work)
    {
        bool ret = false;
        unsigned long flags;
        local_irq_save(flags);
        if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
            __queue_work(cpu, wq, work);
            ret = true;
        }
        local_irq_restore(flags);
        return ret;
    }

     因此主体就是__queue_work,其中一个核心工作就是调用了insert_work

    static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
                struct list_head *head, unsigned int extra_flags)
    {
        struct worker_pool *pool = pwq->pool;
    
        /* we own @work, set data and link */
        set_work_pwq(work, pwq, extra_flags);
        list_add_tail(&work->entry, head);
        get_pwq(pwq);
    
        /*
         * Ensure either wq_worker_sleeping() sees the above
         * list_add_tail() or we see zero nr_running to avoid workers lying
         * around lazily while there are works to be processed.
         */
        smp_mb();
        /*如果需要更多,则唤醒,主要是判断当前是否有正在运行的worker*/
        if (__need_more_worker(pool))
            wake_up_worker(pool);
    }

    函数首先调用set_work_pwq把pwd写入到work的data字段,然后把work加入到worker_pool维护的work链表中,在最后判断现在是否需要更多worker,如果需要,则执行唤醒操作。当然是针对当前worker_pool,且唤醒的是worker_pool的第一个worker。其实在queue_work中,为避免work重入,在选定worker_pool的时候会判断该work是否仍在其他worker_pool上运行,如果是,就把该work挂入对应worker_pool的work_list上,

    以马内利

    参考资料:

    LInux 3.10.1源码

  • 相关阅读:
    20145210 20145226 《信息安全系统设计基础》实验五 简单嵌入式WEB服务器实验
    20145302张薇《网络对抗技术》PC平台逆向破解
    20145302张薇 《网络对抗技术》逆向及BOF基础实践
    小问题汇总
    20145302张薇 《信息安全系统设计基础》课程总结
    20145302张薇 《信息安全系统设计基础》第14周学习总结
    20145302张薇《信息安全系统设计基础》第13周学习总结
    20145302张薇 GDB调试汇编堆栈过程分析
    20145302张薇 《信息安全系统设计基础》第12周学习总结
    20145302张薇 20145308刘昊阳 《信息安全系统设计基础》实验五 网络通信
  • 原文地址:https://www.cnblogs.com/ck1020/p/8334786.html
Copyright © 2011-2022 走看看