zoukankan      html  css  js  c++  java
  • 内核随记(三)同步(2)

    2.2、睡眠与唤醒

    在操作系统中,睡眠和唤醒原语实际上是操作系统的基本原语,也是实现同步的一种方式,而且它还是实现信号量的基础。当进程请求的资源(如内存、文件等)不能得到满足时,就会主动放弃CPU,进入等待状态(可中断等待或者不可中断等待)。当资源满足时,就会由别的进程唤醒,从而投入运行。

    2.2.1、等待队列

    等待队列表示一组睡眠的进程,这些进程正在等待特定的事件发生(或者说条件为真),比如,等待足够的内存。等待队列是一个双链表,每个队列都有一个队列头,其定义如下:

    //include/linux/wait.h
    //等待队列头
    struct __wait_queue_head {
          
    // 自旋锁
        spinlock_t lock;
        
    struct list_head task_list;
    };
    typedef 
    struct __wait_queue_head wait_queue_head_t;

    等待队列链表中的元素类型为:

    typedef struct __wait_queue wait_queue_t;
    //唤醒函数指针
    typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int sync, void *key);
    //默认的唤醒函数
    int default_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key);

    struct __wait_queue {
        
    /*取值为WQ_FLAG_EXCLUSIVE(=1)表示互斥进程,由内核有选择的唤醒.为0时表示非互斥进程,由内核在
        **事件发生时唤醒所有等待进程.
        *
    */
        unsigned 
    int flags;
    #define WQ_FLAG_EXCLUSIVE    0x01
        
    //等待的任务描述符
        struct task_struct * task;

        
    //唤醒函数,默认为default_wake_function
        wait_queue_func_t func;
        
    struct list_head task_list;
    };

    其典型的结构如下:


    等待队列头的初始化:
    DECLARE_WAIT_QUEUE_HEAD(name);
    其定义如下:

    //incude/linux/wait.h
    #define __WAIT_QUEUE_HEAD_INITIALIZER(name) {                \
        .
    lock        = SPIN_LOCK_UNLOCKED,                \
        .task_list    
    = { &(name).task_list, &(name).task_list } }

    //初始化等待队列头
    #define DECLARE_WAIT_QUEUE_HEAD(name) \
        wait_queue_head_t name 
    = __WAIT_QUEUE_HEAD_INITIALIZER(name)

    或者如下:
    wait_queue_head_t my_queue;
    init_waitqueue_head(&my_queue);

    等待队列元素初始化:

    //linux/wait.h
    //wait_queue_t初始化
    static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
    {
        q
    ->flags = 0;
        q
    ->task = p;
        q
    ->func = default_wake_function;
    }

    2.2.2、等待事件(Waiting on the Event)
    内核提供的等待接口包括wait_event(), wait_event_ interruptible(), 和wait_event_interruptible_timeout()。此外sleep_on(), sleep_on_timeout(), 和interruptible_sleep_on()在2.6中仍然支持,但已经过时。这些接口的基本实现如下:

    具体代码如下:

    //linux/wait.h
    #define wait_event(wq, condition)                     \
    do {                                    \
        
    if (condition)    //条件发生                         \
            break;                            \
        __wait_event(wq, condition);                    \
    while (0)

    #define __wait_event(wq, condition)                     \
    do {                                    \
        DEFINE_WAIT(__wait);                        \
                                        \
        
    for (;;) {                            \
            prepare_to_wait(
    &wq, &__wait, TASK_UNINTERRUPTIBLE);    \
            
    if (condition)                        \
                
    break;                        \
            schedule();
    //调度                        \
        }                                \
        finish_wait(
    &wq, &__wait);                    \
    while (0)

    //kernel/wait.c
    void fastcall
    prepare_to_wait(wait_queue_head_t 
    *q, wait_queue_t *wait, int state)
    {
        unsigned 
    long flags;
        
    //非互斥进程
        wait->flags &= ~WQ_FLAG_EXCLUSIVE;
        
    //关中断,并请求自旋锁
        spin_lock_irqsave(&q->lock, flags);
        
    if (list_empty(&wait->task_list))
            __add_wait_queue(q, wait);  
    //将等待任务加入等待队列
        /*
         * don't alter the task state if this is just going to
         * queue an async wait queue callback
         
    */
        
    if (is_sync_wait(wait))
            set_current_state(state);  
    //设置任务当前的状态
        
    //释放自旋锁,并恢复处理器状态
        spin_unlock_irqrestore(&q->lock, flags);
    }

    //等待完成之后,应该设置任务的状态为运行状态,并从等待队列中删除
    void fastcall finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
    {
        unsigned 
    long flags;

        __set_current_state(TASK_RUNNING); 
    //设置为运行状态
        
        
    if (!list_empty_careful(&wait->task_list)) {
            spin_lock_irqsave(
    &q->lock, flags);
            list_del_init(
    &wait->task_list);    //从等待队列中删除
            spin_unlock_irqrestore(&q->lock, flags);
        }
    }

    2.2.3、唤醒(Waking Up)
    接口如下:

    //include/inux/wait.h
    #define wake_up(x)            __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL)
    #define wake_up_nr(x, nr)        __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, nr, NULL)
    #define wake_up_all(x)            __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 0, NULL)
    #define wake_up_interruptible(x)    __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
    #define wake_up_interruptible_nr(x, nr)    __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)
    #define wake_up_interruptible_all(x)    __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
    #define    wake_up_locked(x)        __wake_up_locked((x), TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE)
    #define wake_up_interruptible_sync(x)   __wake_up_sync((x),TASK_INTERRUPTIBLE, 1)

    具体实现:

    //kernel/sched.c
    void fastcall __wake_up(wait_queue_head_t *q, unsigned int mode,
                    
    int nr_exclusive, void *key)
    {
        unsigned 
    long flags;
        
    //请求自旋锁,并关中断
        spin_lock_irqsave(&q->lock, flags);
        __wake_up_common(q, mode, nr_exclusive, 
    0, key);
        spin_unlock_irqrestore(
    &q->lock, flags);
    }
    static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
                     
    int nr_exclusive, int sync, void *key)
    {
        
    struct list_head *tmp, *next;

        list_for_each_safe(tmp, next, 
    &q->task_list) {
            wait_queue_t 
    *curr;
            unsigned flags;
            curr 
    = list_entry(tmp, wait_queue_t, task_list);
            flags 
    = curr->flags;
            
    //调用相应的唤醒函数, 唤醒第1个有WQ_FLAG_EXCLUSIVE标志的进程后停止
            if (curr->func(curr, mode, sync, key) &&
                (flags 
    & WQ_FLAG_EXCLUSIVE) &&
                
    !--nr_exclusive)
                
    break;
        }
    }
    //默认的唤醒函数
    int default_wake_function(wait_queue_t *curr, unsigned mode, int sync, void *key)
    {
        task_t 
    *= curr->task;
        
    return try_to_wake_up(p, mode, sync);
    }

    try_to_wake_up是唤醒原语中核心部分,其具体代码如下:

    /*p被唤醒的进程描述符.
    **state被唤醒的进程的状态掩码
    **sync禁止被唤醒的进程抢占本地CPU正在运行的进程
    */
    static int try_to_wake_up(task_t * p, unsigned int state, int sync)
    {
        
    int cpu, this_cpu, success = 0;
        unsigned 
    long flags;
        
    long old_state;
        runqueue_t 
    *rq;
    #ifdef CONFIG_SMP
        unsigned 
    long load, this_load;
        
    struct sched_domain *sd;
        
    int new_cpu;
    #endif
        
    //关闭中断,并获取最后执行该进程的CPU(可能不同于本地CPU)的运行队列的锁
        rq = task_rq_lock(p, &flags);
        schedstat_inc(rq, ttwu_cnt);
        old_state 
    = p->state;
        
    if (!(old_state & state))
            
    goto out;

        
    if (p->array)
            
    goto out_running;
        
        
    //最后执行该任务的CPU
        cpu = task_cpu(p);
        
    //本地CPU
        this_cpu = smp_processor_id();
        
    /*对于多CPU系统,检查要被唤醒的进程是否应该从最近执行该进程的CPU的运行队列,
    **转移到另外一个CPU的运行队列.
    */
    #ifdef CONFIG_SMP
        
    if (unlikely(task_running(rq, p)))
            
    goto out_activate;

        new_cpu 
    = cpu;

        
    if (cpu == this_cpu || unlikely(!cpu_isset(this_cpu, p->cpus_allowed)))
            
    goto out_set_cpu;

        load 
    = source_load(cpu);
        this_load 
    = target_load(this_cpu);

        
    /*
         * If sync wakeup then subtract the (maximum possible) effect of
         * the currently running task from the load of the current CPU:
         
    */
        
    if (sync)
            this_load 
    -= SCHED_LOAD_SCALE;

        
    /* Don't pull the task off an idle CPU to a busy one */
        
    if (load < SCHED_LOAD_SCALE/2 && this_load > SCHED_LOAD_SCALE/2)
            
    goto out_set_cpu;

        new_cpu 
    = this_cpu; /* Wake to this CPU if we can */

        
    /*
         * Scan domains for affine wakeup and passive balancing
         * possibilities.
         
    */
        for_each_domain(this_cpu, sd) {
            unsigned 
    int imbalance;
            
    /*
             * Start passive balancing when half the imbalance_pct
             * limit is reached.
             
    */
            imbalance 
    = sd->imbalance_pct + (sd->imbalance_pct - 100/ 2;

            
    if ((sd->flags & SD_WAKE_AFFINE) &&
                    
    !task_hot(p, rq->timestamp_last_tick, sd)) {
                
    /*
                 * This domain has SD_WAKE_AFFINE and p is cache cold
                 * in this domain.
                 
    */
                
    if (cpu_isset(cpu, sd->span)) {
                    schedstat_inc(sd, ttwu_wake_affine);
                    
    goto out_set_cpu;
                }
            } 
    else if ((sd->flags & SD_WAKE_BALANCE) &&
                    imbalance
    *this_load <= 100*load) {
                
    /*
                 * This domain has SD_WAKE_BALANCE and there is
                 * an imbalance.
                 
    */
                
    if (cpu_isset(cpu, sd->span)) {
                    schedstat_inc(sd, ttwu_wake_balance);
                    
    goto out_set_cpu;
                }
            }
        }

        new_cpu 
    = cpu; /* Could not wake to this_cpu. Wake to cpu instead */
    out_set_cpu:
        schedstat_inc(rq, ttwu_attempts);
        new_cpu 
    = wake_idle(new_cpu, p);
        
    if (new_cpu != cpu && cpu_isset(new_cpu, p->cpus_allowed)) {
            schedstat_inc(rq, ttwu_moved);
            set_task_cpu(p, new_cpu);
            task_rq_unlock(rq, 
    &flags);
            
    /* might preempt at this point */
            rq 
    = task_rq_lock(p, &flags);
            old_state 
    = p->state;
            
    if (!(old_state & state))
                
    goto out;
            
    if (p->array)
                
    goto out_running;

            this_cpu 
    = smp_processor_id();
            cpu 
    = task_cpu(p);
        }

    out_activate:
    #endif /* CONFIG_SMP */
        
    if (old_state == TASK_UNINTERRUPTIBLE) {
            rq
    ->nr_uninterruptible--;
            
    /*
             * Tasks on involuntary sleep don't earn
             * sleep_avg beyond just interactive state.
             
    */
            p
    ->activated = -1;
        }

        
    /*
         * Sync wakeups (i.e. those types of wakeups where the waker
         * has indicated that it will leave the CPU in short order)
         * don't trigger a preemption, if the woken up task will run on
         * this cpu. (in this case the 'I will reschedule' promise of
         * the waker guarantees that the freshly woken up task is going
         * to be considered on this CPU.)
         
    */
         
    //将进程p加入目标CPU的可运行队列
        activate_task(p, rq, cpu == this_cpu);
        
    /*如果没有设置sync标志(表示允许抢占),且目标CPU不是本地CPU,则检查p是否比rq运行队列中当前进程的动态优先级高.
        **即(p)->prio < (rq)->curr->prio,如果是,则调用resched_task()抢占rq->curr。
        
    */
        
    if (!sync || cpu != this_cpu) {
            
    if (TASK_PREEMPTS_CURR(p, rq))
                
    /*在单CPU中,仅仅设置TIF_NEED_RESCHED标志.多CPU系统中,则检查相应标志,并使目标CPU重新调度
                
    */
                resched_task(rq
    ->curr);
        }
        success 
    = 1;

    out_running:
        
    //设置进程的状态
        p->state = TASK_RUNNING;
    out:
        
    //释放rq的锁,并打开本地中断
        task_rq_unlock(rq, &flags);

        
    return success;
    }


    #ifdef CONFIG_SMP
    //多CPU系统
    static void resched_task(task_t *p)
    {
        
    int need_resched, nrpolling;

        BUG_ON(
    !spin_is_locked(&task_rq(p)->lock));

        
    /* minimise the chance of sending an interrupt to poll_idle() */
        nrpolling 
    = test_tsk_thread_flag(p,TIF_POLLING_NRFLAG);
        need_resched 
    = test_and_set_tsk_thread_flag(p,TIF_NEED_RESCHED);
        nrpolling 
    |= test_tsk_thread_flag(p,TIF_POLLING_NRFLAG);
        
        
    if (!need_resched && !nrpolling && (task_cpu(p) != smp_processor_id()))
            
    //产生IPI,强制目标CPU重新调度
            smp_send_reschedule(task_cpu(p));
    }
    #else
    //单CPU系统
    static inline void resched_task(task_t *p)
    {
        set_tsk_need_resched(p);
    }
    #endif

    2.2.4、互斥等待
    当调用wake_up唤醒等待队列时,等待队列上的所有进程都转置为可运行。在一些情况下,这种做法是正确的,比如等待某个特定的事件。但是在另外一些情况,可以提前知道只有一个被唤醒的进程能够成功的获取资源,比如等待临界区资源,其它的进程将再次睡眠。如果等待队列中的进程数量太大,将会严重影响系统性能,这就是所谓的thundering herd行为。为此,内核引入互斥等待,它与非互斥等待的区别如下:
    (1) 当一个等待队列入口有 WQ_FLAG_EXCLUSEVE 标志置位, 它被添加到等待队列的尾部. 没有这个标志的入口项, 相反, 添加到开始。
    (2) 当 wake_up 被在一个等待队列上调用, 它在唤醒第一个有 WQ_FLAG_EXCLUSIVE 标志的进程后停止。
    这样,进行互斥等待的进程一次只唤醒一个。使一个进程进入互斥等待是调用prepare_to_wait_exclusive完成的。

    //kernel/wait.c
    void fastcall
    prepare_to_wait_exclusive(wait_queue_head_t 
    *q, wait_queue_t *wait, int state)
    {
        unsigned 
    long flags;
        
    //互斥标志
        wait->flags |= WQ_FLAG_EXCLUSIVE;
        spin_lock_irqsave(
    &q->lock, flags);
        
    if (list_empty(&wait->task_list))
            __add_wait_queue_tail(q, wait);
        
    /*
         * don't alter the task state if this is just going to
          * queue an async wait queue callback
         
    */
        
    if (is_sync_wait(wait))
            set_current_state(state);
        spin_unlock_irqrestore(
    &q->lock, flags);
    }


  • 相关阅读:
    Qt 学习之路:Canvas
    Qt 学习之路:模型-视图高级技术
    Qt 学习之路 :视图代理
    Qt 学习之路 :动态视图
    Qt 学习之路 :Repeater
    Qt 学习之路 :Qt Quick Controls
    mysql-gdb--oracle
    redis-BOOK
    2016qconbeijing
    LINUX 内核月报 taobao
  • 原文地址:https://www.cnblogs.com/hustcat/p/1558673.html
Copyright © 2011-2022 走看看