zoukankan      html  css  js  c++  java
  • spin用户态加锁源码分析

    源码分析版本:glic-2.9

    spin_lock加锁

    pthread_spin_lock:

    int
    pthread_spin_lock (lock)
         pthread_spinlock_t *lock;
    {
      asm ("\n"
           "1:\t" LOCK_PREFIX "decl %0\n\t" //锁总线,开始加锁,在%0
           "jne 2f\n\t" //加锁不成功,jns 汇编指令检查 EFLAGS 寄存器的 SF(符号)位,如果为 0,说明 slock 原来的值为 1,则线程获得锁,然后跳到标签 2 的位置结束本次函数调用。如果 SF 位为 1,说明 slock 原来的值为 0 或负数,锁已被占用。
           ".subsection 1\n\t"  //重新加锁
           ".align 16\n"  //按照2的16次方对其,也就是8k
           "2:\trep; nop\n\t"   //栈顶字单元出栈,返回
    "cmpl $0, %0\n\t" //做减法运算,探测锁是否可用
    "jg 1b\n\t" //锁可用,到1处加锁
    "jmp 2b\n\t" //跳转到2,
    ".previous"
    :
    "=m" (*lock)
    :
    "m" (*lock));
    return 0; }

    所以可以看出,如果函数加锁成功了,那么就跳出,否则就会不停的去循环加锁。spin lock则可以理解为在一个while(1)循环中用内嵌的汇编代码实现的锁操作,一直在CPU上循环检测锁的状态。这样导致在线程时间片内线程空转。浪费了大量的系统资源,导致性能降低。

    mutex:庞大的加锁过程.从实现原理上来讲,Mutex属于sleep-waiting类型的锁。例如在一个双核的机器上有两个线程(线程A和线程B),它们分别运行在Core0和Core1上。假设线程A想要通过pthread_mutex_lock操作去得到一个临界区的锁,而此时这个锁正被线程B所持有,那么线程A就会被阻塞(blocking),Core0 会在此时进行上下文切换(Context Switch)将线程A置于等待队列中,此时Core0就可以运行其他的任务(例如另一个线程C)而不必进行忙等待。而Spin lock则不然,它属于busy-waiting类型的锁,如果线程A是使用pthread_spin_lock操作去请求锁,那么线程A就会一直在 Core0上进行忙等待并不停的进行锁请求,直到得到这个锁为止。

    int
    __pthread_mutex_lock (mutex)
         pthread_mutex_t *mutex;
    {
      assert (sizeof (mutex->__size) >= sizeof (mutex->__data));
    
      int oldval;
    获取线程id pid_t id
    = THREAD_GETMEM (THREAD_SELF, tid); int retval = 0;

    互斥锁的类型:有以下几个取值空间:

    
    

      PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。

      PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。

      PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。

      PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。

    
    
    switch (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex),
                    PTHREAD_MUTEX_TIMED_NP))
        {
          /* Recursive mutex.  */
        case PTHREAD_MUTEX_RECURSIVE_NP:
          /* Check whether we already hold the mutex.  */
          if (mutex->__data.__owner == id)
        {
          /* Just bump the counter.  */
          if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
            /* Overflow of the counter.  */
            return EAGAIN;
    
          ++mutex->__data.__count;
    
          return 0;
        }
    
          /* We have to get the mutex.  */
          LLL_MUTEX_LOCK (mutex);
    可以看到,这个兄弟成为了数据保护的最后一道防线了。只要通过这个检测,线程就算安全了,可以认为是偷渡的最后一道门槛,过了这个,就到米国了。这就涉及到了那个经典的“用户态原子操作”了,这个问题在 ulrich dreppler的文章中有论述,这个依赖于体系结构,例如386下的cmpx本身是多CPU原子操作,所以可以实现原子性操作,而对于PowerPC和MIPS这类RISC机型,人家也有自己的玩法,就是lwarx和swarx这个姐妹花,从而可以完成用户态原子操作
          assert (mutex->__data.__owner == 0);
          mutex->__data.__count = 1;
          break;
    
          /* Error checking mutex.  */
        case PTHREAD_MUTEX_ERRORCHECK_NP:
          /* Check whether we already hold the mutex.  */
          if (__builtin_expect (mutex->__data.__owner == id, 0))
        return EDEADLK;
    
          /* FALLTHROUGH */
    
        case PTHREAD_MUTEX_TIMED_NP:
        simple:
          /* Normal mutex.  */
          LLL_MUTEX_LOCK (mutex);
          assert (mutex->__data.__owner == 0);
          break;
    
        case PTHREAD_MUTEX_ADAPTIVE_NP:
          if (! __is_smp)
        goto simple;
    
          if (LLL_MUTEX_TRYLOCK (mutex) != 0)
        {
          int cnt = 0;
          int max_cnt = MIN (MAX_ADAPTIVE_COUNT,
                     mutex->__data.__spins * 2 + 10);
          do
            {
              if (cnt++ >= max_cnt)
            {
              LLL_MUTEX_LOCK (mutex);
              break;
            }
    
    #ifdef BUSY_WAIT_NOP
              BUSY_WAIT_NOP;
    #endif
            }
          while (LLL_MUTEX_TRYLOCK (mutex) != 0);
    
          mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
        }
          assert (mutex->__data.__owner == 0);
          break;
    
        case PTHREAD_MUTEX_ROBUST_RECURSIVE_NP:
        case PTHREAD_MUTEX_ROBUST_ERRORCHECK_NP:
        case PTHREAD_MUTEX_ROBUST_NORMAL_NP:
        case PTHREAD_MUTEX_ROBUST_ADAPTIVE_NP:
          THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
                 &mutex->__data.__list.__next);
    
          oldval = mutex->__data.__lock;
          do
        {
        again:
          if ((oldval & FUTEX_OWNER_DIED) != 0)
            {
              /* The previous owner died.  Try locking the mutex.  */
              int newval = id;
    #ifdef NO_INCR
              newval |= FUTEX_WAITERS;
    #else
              newval |= (oldval & FUTEX_WAITERS);
    #endif
    
              newval
            = atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
                                   newval, oldval);
    
              if (newval != oldval)
            {
              oldval = newval;
              goto again;
            }
    
              /* We got the mutex.  */
              mutex->__data.__count = 1;
              /* But it is inconsistent unless marked otherwise.  */
              mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;
    
              ENQUEUE_MUTEX (mutex);
              THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
    
              /* Note that we deliberately exit here.  If we fall
             through to the end of the function __nusers would be
             incremented which is not correct because the old
             owner has to be discounted.  If we are not supposed
             to increment __nusers we actually have to decrement
             it here.  */
    #ifdef NO_INCR
              --mutex->__data.__nusers;
    #endif
    
              return EOWNERDEAD;
            }
    
          /* Check whether we already hold the mutex.  */
          if (__builtin_expect ((oldval & FUTEX_TID_MASK) == id, 0))
            {
              int kind = PTHREAD_MUTEX_TYPE (mutex);
              if (kind == PTHREAD_MUTEX_ROBUST_ERRORCHECK_NP)
            {
              THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
                     NULL);
              return EDEADLK;
            }
    
              if (kind == PTHREAD_MUTEX_ROBUST_RECURSIVE_NP)
            {
              THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
                     NULL);
    
              /* Just bump the counter.  */
              if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
                /* Overflow of the counter.  */
                return EAGAIN;
    
              ++mutex->__data.__count;
    
              return 0;
            }
            }
    
          oldval = LLL_ROBUST_MUTEX_LOCK (mutex, id);
    
          if (__builtin_expect (mutex->__data.__owner
                    == PTHREAD_MUTEX_NOTRECOVERABLE, 0))
            {
              /* This mutex is now not recoverable.  */
              mutex->__data.__count = 0;
              lll_unlock (mutex->__data.__lock,
                  PTHREAD_ROBUST_MUTEX_PSHARED (mutex));
              THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
              return ENOTRECOVERABLE;
            }
        }
          while ((oldval & FUTEX_OWNER_DIED) != 0);
    
          mutex->__data.__count = 1;
          ENQUEUE_MUTEX (mutex);
          THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
          break;
    
        case PTHREAD_MUTEX_PI_RECURSIVE_NP:
        case PTHREAD_MUTEX_PI_ERRORCHECK_NP:
        case PTHREAD_MUTEX_PI_NORMAL_NP:
        case PTHREAD_MUTEX_PI_ADAPTIVE_NP:
        case PTHREAD_MUTEX_PI_ROBUST_RECURSIVE_NP:
        case PTHREAD_MUTEX_PI_ROBUST_ERRORCHECK_NP:
        case PTHREAD_MUTEX_PI_ROBUST_NORMAL_NP:
        case PTHREAD_MUTEX_PI_ROBUST_ADAPTIVE_NP:
          {
        int kind = mutex->__data.__kind & PTHREAD_MUTEX_KIND_MASK_NP;
        int robust = mutex->__data.__kind & PTHREAD_MUTEX_ROBUST_NORMAL_NP;
    
        if (robust)
          /* Note: robust PI futexes are signaled by setting bit 0.  */
          THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
                 (void *) (((uintptr_t) &mutex->__data.__list.__next)
                       | 1));
    
        oldval = mutex->__data.__lock;
    
        /* Check whether we already hold the mutex.  */
        if (__builtin_expect ((oldval & FUTEX_TID_MASK) == id, 0))
          {
            if (kind == PTHREAD_MUTEX_ERRORCHECK_NP)
              {
            THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
            return EDEADLK;
              }
    
            if (kind == PTHREAD_MUTEX_RECURSIVE_NP)
              {
            THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
    
            /* Just bump the counter.  */
            if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
              /* Overflow of the counter.  */
              return EAGAIN;
    
            ++mutex->__data.__count;
    
            return 0;
              }
          }
    
        int newval = id;
    #ifdef NO_INCR
        newval |= FUTEX_WAITERS;
    #endif
        oldval = atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
                                  newval, 0);
    
        if (oldval != 0)
          {
            /* The mutex is locked.  The kernel will now take care of
               everything.  */
            int private = (robust
                   ? PTHREAD_ROBUST_MUTEX_PSHARED (mutex)
                   : PTHREAD_MUTEX_PSHARED (mutex));
            INTERNAL_SYSCALL_DECL (__err);
            int e = INTERNAL_SYSCALL (futex, __err, 4, &mutex->__data.__lock,
                          __lll_private_flag (FUTEX_LOCK_PI,
                                  private), 1, 0);
    
            if (INTERNAL_SYSCALL_ERROR_P (e, __err)
            && (INTERNAL_SYSCALL_ERRNO (e, __err) == ESRCH
                || INTERNAL_SYSCALL_ERRNO (e, __err) == EDEADLK))
              {
            assert (INTERNAL_SYSCALL_ERRNO (e, __err) != EDEADLK
                || (kind != PTHREAD_MUTEX_ERRORCHECK_NP
                    && kind != PTHREAD_MUTEX_RECURSIVE_NP));
            /* ESRCH can happen only for non-robust PI mutexes where
               the owner of the lock died.  */
            assert (INTERNAL_SYSCALL_ERRNO (e, __err) != ESRCH || !robust);
    
            /* Delay the thread indefinitely.  */
            while (1)
              pause_not_cancel ();
              }
    
            oldval = mutex->__data.__lock;
    
            assert (robust || (oldval & FUTEX_OWNER_DIED) == 0);
          }
    
        if (__builtin_expect (oldval & FUTEX_OWNER_DIED, 0))
          {
            atomic_and (&mutex->__data.__lock, ~FUTEX_OWNER_DIED);
    
            /* We got the mutex.  */
            mutex->__data.__count = 1;
            /* But it is inconsistent unless marked otherwise.  */
            mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;
    
            ENQUEUE_MUTEX_PI (mutex);
            THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
    
            /* Note that we deliberately exit here.  If we fall
               through to the end of the function __nusers would be
               incremented which is not correct because the old owner
               has to be discounted.  If we are not supposed to
               increment __nusers we actually have to decrement it here.  */
    #ifdef NO_INCR
            --mutex->__data.__nusers;
    #endif
    
            return EOWNERDEAD;
          }
    
        if (robust
            && __builtin_expect (mutex->__data.__owner
                     == PTHREAD_MUTEX_NOTRECOVERABLE, 0))
          {
            /* This mutex is now not recoverable.  */
            mutex->__data.__count = 0;
    
            INTERNAL_SYSCALL_DECL (__err);
            INTERNAL_SYSCALL (futex, __err, 4, &mutex->__data.__lock,
                      __lll_private_flag (FUTEX_UNLOCK_PI,
                              PTHREAD_ROBUST_MUTEX_PSHARED (mutex)
    ),
                      0, 0);
    
            THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
            return ENOTRECOVERABLE;
          }
    
        mutex->__data.__count = 1;
        if (robust)
          {
            ENQUEUE_MUTEX_PI (mutex);
            THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
          }
          }
          break;
    
        case PTHREAD_MUTEX_PP_RECURSIVE_NP:
        case PTHREAD_MUTEX_PP_ERRORCHECK_NP:
        case PTHREAD_MUTEX_PP_NORMAL_NP:
        case PTHREAD_MUTEX_PP_ADAPTIVE_NP:
          {
        int kind = mutex->__data.__kind & PTHREAD_MUTEX_KIND_MASK_NP;
    
        oldval = mutex->__data.__lock;
    
        /* Check whether we already hold the mutex.  */
        if (mutex->__data.__owner == id)
          {
            if (kind == PTHREAD_MUTEX_ERRORCHECK_NP)
              return EDEADLK;
    
            if (kind == PTHREAD_MUTEX_RECURSIVE_NP)
              {
            /* Just bump the counter.  */
            if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
              /* Overflow of the counter.  */
              return EAGAIN;
    
            ++mutex->__data.__count;
    
            return 0;
              }
          }
    
        int oldprio = -1, ceilval;
        do
          {
            int ceiling = (oldval & PTHREAD_MUTEX_PRIO_CEILING_MASK)
                  >> PTHREAD_MUTEX_PRIO_CEILING_SHIFT;
    
            if (__pthread_current_priority () > ceiling)
              {
            if (oldprio != -1)
              __pthread_tpp_change_priority (oldprio, -1);
            return EINVAL;
              }
    
            retval = __pthread_tpp_change_priority (oldprio, ceiling);
            if (retval)
              return retval;
    
            ceilval = ceiling << PTHREAD_MUTEX_PRIO_CEILING_SHIFT;
            oldprio = ceiling;
    
            oldval
              = atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
    #ifdef NO_INCR
                                 ceilval | 2,
    #else
                                 ceilval | 1,
    #endif
                                 ceilval);
    
            if (oldval == ceilval)
              break;
    
            do
              {
            oldval
              = atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
                                 ceilval | 2,
                                 ceilval | 1);
    
            if ((oldval & PTHREAD_MUTEX_PRIO_CEILING_MASK) != ceilval)
              break;
    
            if (oldval != ceilval)
              lll_futex_wait (&mutex->__data.__lock, ceilval | 2,
                      PTHREAD_MUTEX_PSHARED (mutex));
              }
            while (atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
                                ceilval | 2, ceilval)
               != ceilval);
          }
        while ((oldval & PTHREAD_MUTEX_PRIO_CEILING_MASK) != ceilval);
    
        assert (mutex->__data.__owner == 0);
        mutex->__data.__count = 1;
          }
          break;
    
        default:
          /* Correct code cannot set any other type.  */
          return EINVAL;
        }
    
      /* Record the ownership.  */
      mutex->__data.__owner = id;
    #ifndef NO_INCR
      ++mutex->__data.__nusers;
    #endif
    
      return retval;
    }

    可以看到,里面最关键的函数式

    LLL_MUTEX_LOCK (mutex);

    而这个函数是一个宏定义

     1 #ifndef LLL_MUTEX_LOCK
     2 # define LLL_MUTEX_LOCK(mutex) \
     3 lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))
     4 # define LLL_MUTEX_TRYLOCK(mutex) \
     5 lll_trylock ((mutex)->__data.__lock)
     6 # define LLL_ROBUST_MUTEX_LOCK(mutex, id) \
     7 lll_robust_lock ((mutex)->__data.__lock, id, \
     8 PTHREAD_ROBUST_MUTEX_PSHARED (mutex))
     9 #endif
    10 
    11 看x86_64的实现
    12 这里就开始了一次线程切换,并给加上锁
    13 #define lll_lock(futex, private) \ 14 15 (void) \ 16 ({ int ignore1, ignore2, ignore3; \ 17 if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \ 18 __asm __volatile (__lll_lock_asm_start \ 19 ".subsection 1\n\t" \ 20 ".type _L_lock_%=, @function\n" \ 21 "_L_lock_%=:\n" \ 22 "1:\tleaq %2, %%rdi\n"\目标地址传送指令: 将一个近地址指针写入到指定的寄存器。,将rdi里面的数据放到2中

    23 "2:\tsubq $128, %%rsp\n" \ 应该是做减法运算

    24 "3:\tcallq __lll_lock_wait_private\n" \

    25 "4:\taddq $128, %%rsp\n" \ 做加法运算

    26 "5:\tjmp 24f\n" \ 跳转

    27 "6:\t.size _L_lock_%=, 6b-1b\n\t" \
    28 ".previous\n" \

    29 LLL_STUB_UNWIND_INFO_5 \
    30 "24:" \

    31 : "=S" (ignore1), "=&D" (ignore2), "=m" (futex), \

    32 "=a" (ignore3) \

    33 : "0" (1), "m" (futex), "3" (0) \

    34 : "cx", "r11", "cc", "memory"); \

    35 else \

    36 __asm __volatile (__lll_lock_asm_start \

    37 ".subsection 1\n\t" \

    38 ".type _L_lock_%=, @function\n" \

    39 "_L_lock_%=:\n" \

    40 "1:\tleaq %2, %%rdi\n" \

    41 "2:\tsubq $128, %%rsp\n" \

    42 "3:\tcallq __lll_lock_wait\n" \

    43 "4:\taddq $128, %%rsp\n" \

    44 "5:\tjmp 24f\n" \

    45 "6:\t.size _L_lock_%=, 6b-1b\n\t" \

    46 ".previous\n" \

    47 LLL_STUB_UNWIND_INFO_5 \

    48 "24:" \

    49 : "=S" (ignore1), "=D" (ignore2), "=m" (futex), \

    50 "=a" (ignore3) \

    51 : "1" (1), "m" (futex), "3" (0), "0" (private) \

    52 : "cx", "r11", "cc", "memory"); \

    53 })

    这个目前小弟还暂时不是很熟悉汇编,所以也只能分析到此了。具体里面哪个地方时切换和加锁还有待分析,先得去学习汇编了, 希望有高手给指点一下这段代码。

     

     

  • 相关阅读:
    【转】Storm并行度详解
    Storm 集群安装配置
    【原】storm源码之storm代码结构【译】
    Storm中-Worker Executor Task的关系
    Storm源码分析--Nimbus-data
    storm配置
    Nimbus<三>Storm源码分析--Nimbus启动过程
    Nimbus<二>storm启动nimbus源码分析-nimbus.clj
    Twitter Storm源代码分析之Nimbus/Supervisor本地目录结构
    linux 相关学习记录
  • 原文地址:https://www.cnblogs.com/gogly/p/2706269.html
Copyright © 2011-2022 走看看