zoukankan      html  css  js  c++  java
  • arm架构下spinlock原理 (代码解读)【转】

    转自:https://blog.csdn.net/adaptiver/article/details/72389453

    http://blog.csdn.net/longwang155069/article/details/52055876

    自旋锁的引入

    原子变量适用在多核之间多单一共享变量进行互斥访问,如果要保护多个变量,并且这些变量之间有逻辑关系时,原子变量就不适用了。例如:常见的双向链表。假设有三个链表节点A、B、C。需要将节点B插入节点A、C之间。如果CPU A刚好将A节点的后向指针指向B,但是还没有将B的后向指针指向C。此时CPU B要遍历链表,这将会一个灾难性的后果。

    如果共享数据段在中断上下文或者进程上下文被访问呢? 如果在进程上下文被访问,完全可以使用信号量semaphore机制来实现互斥。如果在中断上下文被访问呢? 就不能使用semaphore来实现互斥,因为semaphore会引起睡眠的。这时候就引入了spin_lock

    spin_lock的实现思想

    先说生活中一个示例,如果机智的你乘坐过火车的话,就一定知道早上6点-7点在火车上厕所的感受了。如果机智你的起来上厕所,发现一大堆人都等着上厕所,男女老少。接设你前面排了三个人,分别为A, B, C。 
    当A进入厕所之后,关闭了厕所的门,然后就会看见一个红灯亮着“有人“,这时候B,C和机智的你都在等待。当A出来后,B进去不到20s就出来了。然后进去了C,然后你就苦苦的在等待,一直在观察这什么时候红灯熄灭,这让机智的你等待了10min, 然后机智的你进去就10s搞定。好了关于生活的例子说完了,再回到spin_lock中。

    可以将厕所当作临界区。A, B, C, 机智的你是四个cpu, 红灯是临界区时候有cpu进入状态。 
    当A进入临界区(厕所),然后就会将进入状态修改为忙(红灯亮),然后B,C以及机智的你都会判断当前状态,如果是忙,就等待,不忙就让B先进去,B进入之后同样的操作。

    spin_lock早期代码分析

    因为spin_lock在ARM平台上的实现策略发生过变化,所以先分析以前版本2.6.18的spin_lock。

    主要是以SMP系统分析,后面会稍带分析UP系统。

    1.  
      <include/linux/spinlock.h>
    2.  
      ----------------------------------------------------------
    3.  
      #define spin_lock(lock) _spin_lock(lock)
    4.  
       
    5.  
      <kernel/spinlock.c>
    6.  
      --------------------------------------------------------
    7.  
      void __lockfunc _spin_lock(spinlock_t *lock)
    8.  
      {
    9.  
      preempt_disable();
    10.  
      spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    11.  
      _raw_spin_lock(lock);
    12.  
      }

    其中preempt_disable()是用来关闭掉抢占的。如果系统中打开了CONFIG_PREEMPT该选项的话,就是用来关闭系统的抢占,如果没有开启相当于什么都没干,只是为了统一代码。至于这里为什么需要关闭抢占,在后面会说。

    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);

    这段代码使用来调试使用的,没有系统没有开启CONFIG_DEBUG_LOCK_ALLOC配置的话,这样代码也相当于什么都没干。继续往下。

    1.  
      define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)
    2. static inline void __raw_spin_lock(raw_spinlock_t *lock)
    3. {
    4. unsigned long tmp;
    5.  
    6. __asm__ __volatile__(
    7.  
      "1: ldrex %0, [%1] "
    8.  
      " teq %0, #0 "
    9.  
      " strexeq %0, %2, [%1] "
    10.  
      " teqeq %0, #0 "
    11.  
      " bne 1b"
    12.  
      : "=&r" (tmp)
    13.  
      : "r" (&lock->lock), "r" (1)
    14.  
      : "cc");
    15.  
       
    16.  
      smp_mb();
    17.  
      }

    回头看看spinlock_t变量的定义:

    1.  
      typedef struct {
    2.  
      raw_spinlock_t raw_lock;
    3.  
      } spinlock_t;
    4.  
       
    5.  
      typedef struct {
    6.  
      volatile unsigned int lock;
    7.  
      } raw_spinlock_t;

    通过层层的调用,最后spinlock_t就是一个volatile unsigned int型变量。

    汇编代码C语言解释
    1: ldrex %0, [%1] tmp=lock->lock 读取lock的状态赋值给tmp
    teq %0, #0 if(tmp == 0) 判断lock的状态是否为0。如果是0说明可以获得锁;如果不为0,说明自旋锁处于上锁状态,不能访问,执行bne 1b指令,跳到标号1处不停执行。
    strexeq %0, %2, [%1] lock->lock=1 使用常量1来更新锁的状态,并将执行结果放入到tmp中
    teqeq %0, #0 if(tmp == 0) 用来判断tmp是否为0,如果为0,表明更新锁的状态成功;如果不为0表明锁的状态没哟更新成功,执行”bne 1b”,跳转到标号1继续执行。

    早期spin_lock存在的不公平性

    还是回到火车上上厕所的故事中,某天早上去上厕所,发现有一大堆的人都在排队。但是进去厕所的人已经进去了半个小时,后面的人已经开始等待不急了,有的谩骂起来,有人大喊憋不住了,机智你的刚好肚子疼,快憋不住了。刚好排在第一位是你的媳妇,然后你就插队立马上了厕所。你出来后,接着是你儿子,然后你全家。后面的人就一直等待了1个小时终于进入了厕所。

    将这个现象转移到程序中就是,在现代多核的cpu中,因为每个cpu都有chach的存在,导致不需要去访问主存获取lock,所以当当前获取lock的cpu,释放锁后,使其他cpu的cache都失效,然后释放的锁在下一次就比较容易进入临界去,导致出现了不公平。

    ticket机制原理

    先看最新的spin_lock的结构体定义:

    1.  
      typedef struct spinlock {
    2.  
      struct raw_spinlock rlock;
    3.  
      } spinlock_t;
    4.  
       
    5.  
      typedef struct raw_spinlock {
    6.  
      arch_spinlock_t raw_lock;
    7.  
      } raw_spinlock_t;
    8.  
       
    9.  
      typedef struct {
    10.  
      union {
    11.  
      u32 slock;
    12.  
      struct __raw_tickets {
    13.  
      #ifdef __ARMEB__
    14.  
      u16 next;
    15.  
      u16 owner;
    16.  
      #else
    17.  
      u16 owner;
    18.  
      u16 next;
    19.  
      #endif
    20.  
      } tickets;
    21.  
      };
    22.  
      } arch_spinlock_t;

    在分析代码之前,还需要解释一下tickets中的owner和next的含义。详细可见提交: 
    546c2896a42202dbc7d02f7c6ec9948ac1bf511b

    因为有cache的作用,导致本次释放lock的cpu在下一次就可以更快的获取锁。所以在ARMv6上引入了”票”算法来保证每个cpu都是像“FIFO“访问临界区。

    还是说回到火车上厕所的事件,还是早上排队上厕所。这时候好多人都插队,导致没有熟人的人一直上不了厕所,于是火车管理员(虚拟的,只是为了讲解原理而已)出现了。火车管理员说“从现在开始不准插队,我来监督,所有人排位一队“。管理员站在厕所门口,让大家都按次序排队上厕所,这时候就没有人插队了。

    将这个事件转移到程序中的ticket中。刚开始的时候临界区没有cpu进入,状态是空闲的。next和owner的值都是0,当cpu1进入临界区后。将next++, 当cpu1从临界区域执行完后,将owner++。这时候next和owner都为1,说明临界区没有cpu进入。这时候cpu2进入临界区,将next++, 然后cpu2好像干的活比较多,当cpu3进来后,next++,这时候next已经是3了,当cpu2执行完毕后,owner++,owner的值变为2, 表示让cpu2进入临界区,这就保障了各个cpu之间都是先来后到的执行。

    ARM32 上spin_lock代码实现

    1.  
      static inline void arch_spin_lock(arch_spinlock_t *lock)
    2.  
      {
    3.  
      unsigned long tmp;
    4.  
      u32 newval;
    5.  
      arch_spinlock_t lockval;
    6.  
       
    7.  
      prefetchw(&lock->slock);
    8.  
      __asm__ __volatile__(
    9.  
      "1: ldrex %0, [%3] "
    10.  
      " add %1, %0, %4 "
    11.  
      " strex %2, %1, [%3] "
    12.  
      " teq %2, #0 "
    13.  
      " bne 1b"
    14.  
      : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
    15.  
      : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    16.  
      : "cc");
    17.  
       
    18.  
      while (lockval.tickets.next != lockval.tickets.owner) {
    19.  
      wfe();
    20.  
      lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
    21.  
      }
    22.  
      smp_mb();
    23.  
      }
    汇编C语言解释
    1: ldrex %0, [%3] lockval = lock 读取锁的值赋值给lockval
    add %1, %0, %4 newval = lockval + (1 << 16) 将next++之后的值存在newval中
    strex %2, %1, [%3] lock = newval 将新的值存在lock中,将是否成功结果存入在tmp中
    teq %2, #0 if(tmp == 0) 判断上条指令是否成功,如果不成功执行”bne 1b”跳到标号1执行
    1.  
      while (lockval.tickets.next != lockval.tickets.owner) {
    2.  
      wfe();
    3.  
      lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
    4.  
      }

    当tickets中的next和owner不相等的时候,说明临界区在忙, 需要等待。然后cpu会执行wfe指令。当其他cpu忙完之后,会更新owner的值,如果owner的值如果与next值相同,那到next号的cpu执行。

    ARM64 上spin_lock代码实现

    1.  
      static inline void arch_spin_lock(arch_spinlock_t *lock)
    2.  
      {
    3.  
      unsigned int tmp;
    4.  
      arch_spinlock_t lockval, newval;
    5.  
       
    6.  
      asm volatile(
    7.  
      /* Atomically increment the next ticket. */
    8.  
      " prfm pstl1strm, %3 "
    9.  
      "1: ldaxr %w0, %3 "
    10.  
      " add %w1, %w0, %w5 "
    11.  
      " stxr %w2, %w1, %3 "
    12.  
      " cbnz %w2, 1b "
    13.  
      /* Did we get the lock? */
    14.  
      " eor %w1, %w0, %w0, ror #16 "
    15.  
      " cbz %w1, 3f "
    16.  
      /*
    17.  
      * No: spin on the owner. Send a local event to avoid missing an
    18.  
      * unlock before the exclusive load.
    19.  
      */
    20.  
      " sevl "
    21.  
      "2: wfe "
    22.  
      " ldaxrh %w2, %4 "
    23.  
      " eor %w1, %w2, %w0, lsr #16 "
    24.  
      " cbnz %w1, 2b "
    25.  
      /* We got the lock. Critical section starts here. */
    26.  
      "3:"
    27.  
      : "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
    28.  
      : "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
    29.  
      : "memory");
    30.  
      }
    31.  
       
    汇编C语言解释
    prfm pstl1strm, %3 将lock变量读到cache,增加访问速度  
    1: ldaxr %w0, %3 lockval = lock 将lock的值赋值给lockval
    add %w1, %w0, %w5 newval=lockval + (1 << 16) 将lock中的next++, 然后将结果赋值给newval
    stxr %w2, %w1, %3 lock = newval 将newval赋值给lock,同时将是否设置成功结果存放到tmp
    cbnz %w2, 1b if(tmp != 0)goto 1 如果tmp不为0,跳到标号1执行
    eor %w1, %w0, %w0, ror #16 if(next == owner) 判断next是否等于owner
    cbz %w1, 3f if(newval == 0) 进入临界区
    2: wfe 自旋等待  
    ldaxrh %w2, %4 tmp = lock->owner 获取当前的Owner值存放在tmp中
    eor %w1, %w2, %w0, lsr #16 if(next == owner) 判断next是否等于owner
    cbnz %w1, 2b 如果不等跳到标号2自旋,负责进入临界区域  

    ARM64 上spin_unlock代码实现

    1.  
      static inline void arch_spin_unlock(arch_spinlock_t *lock)
    2.  
      {
    3.  
      asm volatile(
    4.  
      " stlrh %w1, %0 "
    5.  
      : "=Q" (lock->owner)
    6.  
      : "r" (lock->owner + 1)
    7.  
      : "memory");
    8.  
      }

    解锁的操作相对简单,就是给owner执行加1的操作。

    读写锁rwlock

    http://blog.csdn.net/longwang155069/article/details/52211024

    读写锁引入

    在前面小节分析了spin_lock的实现,可以知道spin_lock只允许一个thread进入临界区,而且对进入临界区中的操作不做细分。但是在实际中,对临界区的操作分为读和写。如果按照spin_lock的实现,当多个read thread都想进入临界区读取的时候,这时候只有一个read thread进入到临界区,这样效率和性能明显下降。所以就针对某些操作read thread占绝大多数的情况下,提出了读写锁的概念。
     

    读写锁的基本原理

    加锁操作

    • 假设当前临界区没有任何进程,这时候read进程或者write进程都可以进来,但是只能是其一
    • 如果当前临界区只有一个read进程,这时候任意的read进程都可以进入,但是write进程不能进入
    • 如果当前临界区只有一个write进程,这时候任何read/write进程都无法进入。只能自旋等待
    • 如果当前当前临界区有好多个read进程,同时read进程依然还会进入,这时候进入的write进程只能等待。直到临界区一个read进程都没有,才可进入

    解锁操作

    • 如果在read进程离开临界区的时候,需要根据情况决定write进程是否需要进入。只有当临界区没有read进程了,write进程方可进入。
    • 如果在write进程离开临界区的时候,无论write进程或者read进程都可进入临界区,因为write进程是排它的。

    读写锁的定义

    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. typedef struct {  
    2.     arch_rwlock_t raw_lock;  
    3. } rwlock_t;  
    4.   
    5. typedef struct {  
    6.     volatile unsigned int lock;  
    7. } arch_rwlock_t;  
    可以看到读写锁与spin_lock的定义最终相同,只是名字不同罢了。

    读写锁API

    加锁API

    1. read_lock(lock)/write_lock(lock)                                                               #获取指定的锁
    2. read_trylock(lock)/write_trylock(lock)                                                       #尝试获取锁,如果失败不spin,直接返回
    3. read_lock_irq(lock)/write_lock_irq(lock)                                                   #获取指定的锁,同时关掉本地cpu中断
    4. read_lock_irqsave(lock, flags)/write_lock_irqsave(lock, flags)                #保存本地cpu的irq标志,然后关掉cpu中断,获取指定锁
    5. read_lock_bh(lock)/read_lock_bh(lock)                                                   #获取指定的锁,同时关掉中断下半部(bottom half)

    解锁API

    1. read_unlock(lock)/write_unlock(lock)                                                      #释放指定的锁
    2. read_unlock_irq(lock)/write_unlock_irq(lock)                                          #释放指定的锁,同时使能cpu中断
    3. read_unlock_irqrestore/write_unlock_irqrestore                                     #释放锁,同时使能cpu中断,恢复cpu的标识
    4. read_unlock_bh/write_unlock_bh                                                            #释放锁,同时使能cpu中断的下半部

    读写锁的实现

    写入者加锁操作:

    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. /* 
    2.  * Write lock implementation. 
    3.  * 
    4.  * Write locks set bit 31. Unlocking, is done by writing 0 since the lock is 
    5.  * exclusively held. 
    6.  * 
    7.  * The memory barriers are implicit with the load-acquire and store-release 
    8.  * instructions. 
    9.  */  
    10.   
    11. static inline void arch_write_lock(arch_rwlock_t *rw)  
    12. {  
    13.     unsigned int tmp;  
    14.   
    15.     asm volatile(  
    16.     "   sevl "  
    17.     "1: wfe "  
    18.     "2: ldaxr   %w0, %1 "  
    19.     "   cbnz    %w0, 1b "  
    20.     "   stxr    %w0, %w2, %1 "  
    21.     "   cbnz    %w0, 2b "  
    22.     : "=&r" (tmp), "+Q" (rw->lock)  
    23.     : "r" (0x80000000)  
    24.     : "memory");  
    25. }  
    通过注释: write操作的上锁操作是给bit31写1, 解锁操作就是给bit31写0
     
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. "   sevl "  
    2. "1: wfe "  
    使cpu进入低功耗模式
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. 2:  ldaxr   %w0, %1   
    读取锁的值,赋值给tmp变量
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. cbnz    %w0, 1b  
    如果tmp的值不为0, 跳转到标号1重新执行。不等于0说明有read/write进程正在持有锁,所以需要进入低功耗等待。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. stxr    %w0, %w2, %1  
    将锁的bit31设置为1, 然后将设置结果放入tmp中。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. cbnz    %w0, 2b  
    如果tmp的值不为0,说明上条指令执行失败,跳转到标号2继续执行。
     
    可以看到,对于wirte操作,只要临界区有read/write进程存在,就需要自旋等待,直到临界区没有任何进程存在。

    写入者解锁操作:

    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. static inline void arch_write_unlock(arch_rwlock_t *rw)  
    2. {  
    3.     asm volatile(  
    4.     "   stlr    %w1, %0 "  
    5.     : "=Q" (rw->lock) : "r" (0) : "memory");  
    6. }  
    写操作很简单,就是将锁的值全部清为0而已。
     

    读取者加锁操作:

    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. /* 
    2.  * Read lock implementation. 
    3.  * 
    4.  * It exclusively loads the lock value, increments it and stores the new value 
    5.  * back if positive and the CPU still exclusively owns the location. If the 
    6.  * value is negative, the lock is already held. 
    7.  * 
    8.  * During unlocking there may be multiple active read locks but no write lock. 
    9.  * 
    10.  * The memory barriers are implicit with the load-acquire and store-release 
    11.  * instructions. 
    12.  */  
    13. static inline void arch_read_lock(arch_rwlock_t *rw)  
    14. {  
    15.     unsigned int tmp, tmp2;  
    16.   
    17.     asm volatile(  
    18.     "   sevl "  
    19.     "1: wfe "  
    20.     "2: ldaxr   %w0, %2 "  
    21.     "   add %w0, %w0, #1 "  
    22.     "   tbnz    %w0, #31, 1b "  
    23.     "   stxr    %w1, %w0, %2 "  
    24.     "   cbnz    %w1, 2b "  
    25.     : "=&r" (tmp), "=&r" (tmp2), "+Q" (rw->lock)  
    26.     :  
    27.     : "memory");  
    28. }  
    读取者进入临界区先要判断是否有write进程在临界区,如果有必须自旋。如果没有,则可以进入临界区。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. 2:  ldaxr   %w0, %2  
    读取锁的值,赋值给tmp变量。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. add %w0, %w0, #1  
    将tmp的值加1, 然后将结果放入tmp中。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. tbnz    %w0, #31, 1b  
    判断tmp[31]是否等于0,不等于0也就是说write进程在临界区,需要自旋等待,跳到标号1继续。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. stxr    %w1, %w0, %2  
    将tmp的值复制给lock,然后将结果放入tmp2中。
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. cbnz    %w1, 2b  
    判断tmp2是否等于0,不等于0就跳到标号2继续。
     
    可以看到read操作需要先判断临界区是否有write进程存在,如果有就需要自旋。

    读取者解锁操作:

    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. static inline void arch_read_unlock(arch_rwlock_t *rw)  
    2. {  
    3.     unsigned int tmp, tmp2;  
    4.   
    5.     asm volatile(  
    6.     "1: ldxr    %w0, %2 "  
    7.     "   sub %w0, %w0, #1 "  
    8.     "   stlxr   %w1, %w0, %2 "  
    9.     "   cbnz    %w1, 1b "  
    10.     : "=&r" (tmp), "=&r" (tmp2), "+Q" (rw->lock)  
    11.     :  
    12.     : "memory");  
    13. }  
    读取者退出临界区只需要将锁的值减1即可。
     
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. 1:  ldxr    %w0, %2  
    读取锁的值,复制给tmp
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. sub %w0, %w0, #1  
    将tmp的值减去1,同时将结果放入到tmp中
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. stlxr   %w1, %w0, %2  
    将tmp的值复制给lock,然后将结果存放到tmp2
    [cpp] view plain copy
     
     
     
     
     在CODE上查看代码片派生到我的代码片
    1. cbnz    %w1, 1b  
    如果tmp2的值不为0,就跳转到标号1继续执行。

    小节

    从上面的定义可知,lock的是一个unsigned int的32位数。 0-32bit用来表示read thread counter, 31bit用来表示write therad counter。 这样设计是因为write进程每次进入临界区只能有一个,所以一个bit就可以。剩余的31bit位全部给read therad使用。
     
    从概率上将,当一个进程试图去写时,成功获得锁的几率要远小于读进程概率。所以在一个读写相互依赖的系统中,这种设计会导致读取者饥饿,也就是没有数据可读。所以读写锁使用的系统就是读操作占用绝大多数,这样读写操作就比以前的spin lock大大提升效率和性能。
  • 相关阅读:
    无锁队列以及ABA问题
    bigworld源码分析(3)——dbMgr分析
    bigworld源码分析(4)——BaseAppMgr分析
    bigworld源码分析(5)——BaseApp分析
    bigworld源码分析(2)—— loginApp分析
    bigworld源码分析(1)—— 研究bigworld的意义和目标
    C++嵌入Python,以及两者混用
    B-Tree算法分析与实现
    通过sqlserver日志恢复误删除的数据
    win7启动时怎么自动进入桌面
  • 原文地址:https://www.cnblogs.com/sky-heaven/p/12786611.html
Copyright © 2011-2022 走看看