zoukankan      html  css  js  c++  java
  • XV6操作系统代码阅读心得(三):锁

    锁是操作系统中实现进程同步的重要机制。

    基本概念

    临界区(Critical Section)是指对共享数据进行访问与操作的代码区域。所谓共享数据,就是可能有多个代码执行流并发地执行,并在执行中可能会同时访问的数据。

    同步(Synchronization)是指让两个或多个进程/线程能够按照程序员期望的方式来协调执行的顺序。比如,让A进程必须完成某个操作后,B进程才能执行。互斥(Mutual Exclusion)则是指让多个线程不能够同时访问某些数据,必须要一个进程访问完后,另一个进程才能访问。

    当多个进程/线程并发地执行并且访问一块数据,并且进程/线程的执行结果依赖于它们的执行顺序,我们就称这种情况为竞争状态(Race Condition)。

    Xv6操作系统要求在内核临界区操作时中断必须关闭。如果此时中断开启,那么可能会出现以下死锁情况:A进程在内核态运行并拿下了p锁时,触发中断进入中断处理程序,中断处理程序也在内核态中请求p锁,由于锁在A进程手里,且只有A进程执行时才能释放p锁,因此中断处理程序必须返回,p锁才能被释放。那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。

    Xv6中实现了自旋锁(Spinlock)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。显然,自旋锁看上去效率很低,我们很容易想到更加高效的基于等待队列的方法,让等待进程陷入阻塞而不是无限循环。然而,Xv6允许同时运行多个CPU核,多核CPU上的等待队列实现相当复杂,因此使用自旋锁是相对比较简单且能正确执行的实现方案。

    Xv6的Spinlock

    Xv6中锁的定义如下

    // Mutual exclusion lock.
    struct spinlock {
      uint locked;       // Is the lock held?
    
      // For debugging:
      char *name;        // Name of lock.
      struct cpu *cpu;   // The cpu holding the lock.
      uint pcs[10];      // The call stack (an array of program counters)
                         // that locked the lock.
    };
    

    核心的变量只有一个locked,当locked为1时代表锁已被占用,反之未被占用,初始值为0。

    在调用锁之前,必须对锁进行初始化。

    void initlock(struct spinlock *lk, char *name) {
      lk->name = name;
      lk->locked = 0;
      lk->cpu = 0;
    }
    

    最困难的地方是如何对locked变量进行原子操作占用锁和释放锁。这两步具体被实现为acquire()release()函数。(注意v7版本和v11版本的实现略有不同,本文使用的是v11版本)

    acquire()函数

    // Acquire the lock.
    // Loops (spins) until the lock is acquired.
    // Holding a lock for a long time may cause
    // other CPUs to waste time spinning to acquire it.
    void acquire(struct spinlock *lk) {
      pushcli(); // disable interrupts to avoid deadlock.
      if(holding(lk))
        panic("acquire");
    
      // The xchg is atomic.
      while(xchg(&lk->locked, 1) != 0)
        ;
    
      // Tell the C compiler and the processor to not move loads or stores
      // past this point, to ensure that the critical section's memory
      // references happen after the lock is acquired.
      __sync_synchronize();
    
      // Record info about lock acquisition for debugging.
      lk->cpu = mycpu();
      getcallerpcs(&lk, lk->pcs);
    }
    

    acquire()函数首先禁止了中断,并且使用专门的pushcli()函数,这个函数保证了如果有两个acquire()禁止了中断,那么也必须调用两次release()中的popcli()后中断才会被允许。然后,acquire()函数采用xchg指令来实现在设置locked为1的同时获得其原来的值的操作。这里的C代码中封装了一个xchg()函数,在xchg()函数中采用GCC的内联汇编特性,实现如下

    static inline uint xchg(volatile uint *addr, uint newval) {
      uint result;
      // The + in "+m" denotes a read-modify-write operand.
      asm volatile("lock; xchgl %0, %1" :
                   "+m" (*addr), "=a" (result) :
                   "1" (newval) :
                   "cc");
      return result;
    }
    

    其中,volatile标志用于避免gcc对其进行一些优化;第一个冒号后的"+m" (*addr), "=a" (result)是这个汇编指令的两个输出值;newval是这个汇编指令的输入值。假设newval位于eax寄存器中,addr位于rax寄存器中,那么gcc会翻译得到如下汇编指令

     lock; xchgl (%rdx), %eax
    

    由于xchg函数是inline的,它会被直接嵌入调用xchg函数的代码中,使用的寄存器可能会有所不同。

    下面我们来分析一下上面的指令的语义。·lock是一个指令前缀,它保证了这条指令对总线和缓存的独占权,也就是这条指令的执行过程中不会有其他CPU或同CPU内的指令访问缓存和内存。由于现代CPU一般是多发射流水线+乱序执行的,因此一般情况下并不能保证这一点。xchgl指令是一条古老的x86指令,作用是交换两个寄存器或者内存地址里的4字节值,两个值不能都是内存地址,他不会设置条件码。

    那么,仔细思考一下就能发现,以上一条xchg指令就同时做到了交换locked和1的值,并且在之后通过检查eax寄存器就能知道locked的值是否为0。并且,以上操作是原子的,这就保证了有且只有一个进程能够拿到locked的0值并且进入临界区。

    最后,acquire()函数使用__sync_synchronize为了避免编译器对这段代码进行指令顺序调整的话和避免CPU在这块代码采用乱序执行的优化。

    release()函数

    // Release the lock.
    void release(struct spinlock *lk) {
      if(!holding(lk))
        panic("release");
    
      lk->pcs[0] = 0;
      lk->cpu = 0;
    
      // Tell the C compiler and the processor to not move loads or stores
      // past this point, to ensure that all the stores in the critical
      // section are visible to other cores before the lock is released.
      // Both the C compiler and the hardware may re-order loads and
      // stores; __sync_synchronize() tells them both not to.
      __sync_synchronize();
    
      // Release the lock, equivalent to lk->locked = 0.
      // This code can't use a C assignment, since it might
      // not be atomic. A real OS would use C atomics here.
      asm volatile("movl $0, %0" : "+m" (lk->locked) : );
    
      popcli();
    }
    

    release函数为了保证设置locked为0的操作的原子性,同样使用了内联汇编。最后,使用popcli()来允许中断(或者弹出一个cli,但因为其他锁未释放使得中断依然被禁止)。

    在Xv6中实现信号量

    struct semaphore {
      int value;
      struct spinlock lock;
      struct proc *queue[NPROC];
      int end;
      int start;
    };
    
    void sem_init(struct semaphore *s, int value) {
      s->value = value;
      initlock(&s->lock, "semaphore_lock");
      end = start = 0;
    }
    
    void sem_wait(struct semaphore *s) {
      acquire(&s->lock);
      s->value--;
      if (s->value < 0) {
        s->queue[s->end] = myproc();
        s->end = (s->end + 1) % NPROC;
        sleep(myproc(), &s->lock)
      }
      release(&s->lock);
    }
    
    void sem_signal(struct semaphore *s) {
      acquire(&s->lock);
      s->value++;
      if (s->value <= 0) {
        wakeup(s->queue[s->start]);
        s->queue[s->start] = 0;
        s->start = (s->start + 1) % NPROC;
      }
      release(&s->lock);
    }
    

    上面的代码使用Xv6提供的接口实现了信号量,格式和命名与POSIX标准类似。这个信号量的实现采用等待队列的方式。当一个进程因信号量陷入阻塞时,会将自己放进等待队列并睡眠(18-22行)。当一个进程释放信号量时,会从等待队列中取出一个进程继续执行(29-33行)。

  • 相关阅读:
    java
    java
    android-studio于java相关
    转-Cannot refer to an instance field arg while explicitly invoking a constructor
    java
    java
    hdoj 1251 统计难题(字典树)
    hdoj 3555 Bomb(DFA+dp)
    hdoj 1247 Hat’s Words(字典树)
    poj 1204 Word Puzzles(字典树)
  • 原文地址:https://www.cnblogs.com/hehao98/p/10678493.html
Copyright © 2011-2022 走看看