zoukankan      html  css  js  c++  java
  • LockSupport多看一点

    写上一篇博客ReentrantLock相关的时候翻代码发现,线程启停的控制在jdk7中使用的是LockSupport实现的,于是忍不住想看下代码,然后愈发不可收拾,Locksupport借助的是POSIX线程的mutex和condition实现的线程间的启停控制。

    先放下资源吧,我写博客的习惯就是好东西先放出来

    IBM的同仁写的,IBM出品还是相当保障的,浅显易懂的把生涩的东西表达出来,很明了,其实这个部分看完POSIX的知识基本就有了,接下来翻下LockSupport的底层C++实现,具体看下如何使用POSIX实现的线程启停控制。

    https://www.ibm.com/developerworks/cn/linux/thread/posix_thread1/index.html

    https://www.ibm.com/developerworks/cn/linux/thread/posix_thread2/index.html

    https://www.ibm.com/developerworks/cn/linux/thread/posix_thread3/index.html

    幸亏翻了,posix的这三篇IBM的系列文章,帮助很大,再来看LockSupport的底层代码生涩度就降低很多了。

    LockSupport.java 里

        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(false, 0L);
            setBlocker(t, null);
        }

    setBlocker,设置线程对象的parkBlocker属性,实际在ReentrantLock中,指的是FariSync或者NofairSync 锁。

    继续到unsafe.park(false, 0L),这个类就都不陌生了吧,go 

    public native void park(boolean var1, long var2);

    进入原声方法调用:unsafe.cpp

    UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
      UnsafeWrapper("Unsafe_Park");
      HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
      JavaThreadParkedState jtps(thread, time != 0);
      thread->parker()->park(isAbsolute != 0, time);
      HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
    UNSAFE_END

    这里已经有些看不懂了,因为对java native的调用机制看过一点,没太深入这里就不聊了,进入代码两个java代码层过来的参数 jboolean isAbsolute, jlong time,

    宏方法就不看了(老实说c++基本已经还给大学老师了),核心就是这句

    thread->parker()->park(isAbsolute != 0, time);

    然后来屡下c++里的结构。

    class Parker : public os::PlatformParker {
    private:
      volatile int _counter ;
      Parker * FreeNext ;
      JavaThread * AssociatedWith ; // Current association
    
    public:
      Parker() : PlatformParker() {
        _counter       = 0 ;
        FreeNext       = NULL ;
        AssociatedWith = NULL ;
      }
    protected:
      ~Parker() { ShouldNotReachHere(); }
    public:
      // For simplicity of interface with Java, all forms of park (indefinite,
      // relative, and absolute) are multiplexed into one call.
      void park(bool isAbsolute, jlong time);
      void unpark();
    
      // Lifecycle operators
      static Parker * Allocate (JavaThread * t) ;
      static void Release (Parker * e) ;
    private:
      static Parker * volatile FreeList ;
      static volatile int ListLock ;
    
    };

    看到实际上继承了PlatformParker,构造方法也会出发父类构造方法来看PlatformParker的结构和构造方法

    class PlatformParker : public CHeapObj {
      protected:
        pthread_mutex_t _mutex [1] ;
        pthread_cond_t  _cond  [1] ;
    
      // 注释析构函数
     
      public:
        PlatformParker() {
          int status;
          status = pthread_cond_init (_cond, NULL);
          assert_status(status == 0, status, "cond_init");
          status = pthread_mutex_init (_mutex, NULL);
          assert_status(status == 0, status, "mutex_init");
        }
    } ;

    可以看到,使用的正是 POSIX线程的mutex锁和cond条件,构造函数里主要就是做了下锁和条件变量的初始化。

    再来看thread.hpp的parker()方法。

      // JSR166 per-thread parker
    private:
      Parker*    _parker;
    public:
      Parker*     parker() { return _parker; }

    可以看到,parker()方法其实就是返回了_parker属性。

    那么来到重点Parker类。

    parker.hpp

    private:
      volatile int _counter ;
      Parker * FreeNext ;
      JavaThread * AssociatedWith ; // Current association
      // 省略其他

    看下声明了一个volatile变量_counter,是一个许可的数量,跟ReentrantLock 里定义的许可变量基本都是一个原理。

    接下来看代码就能明白它的作用了,park和unpack方法在os_linux.cpp文件中查看。

    void Parker::park(bool isAbsolute, jlong time) {
      // Optional fast-path check:
      // Return immediately if a permit is available.
      if (_counter > 0) {
          _counter = 0 ;
          OrderAccess::fence();
          return ;
      }
    
      Thread* thread = Thread::current();
      assert(thread->is_Java_thread(), "Must be JavaThread");
      JavaThread *jt = (JavaThread *)thread;
    
      // Optional optimization -- avoid state transitions if there's an interrupt pending.
      // Check interrupt before trying to wait
      if (Thread::is_interrupted(thread, false)) {
        return;
      }
    
      // Next, demultiplex/decode time arguments
      timespec absTime;
      if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
        return;
      }
      if (time > 0) {
        unpackTime(&absTime, isAbsolute, time);
      }
    
    
      // Enter safepoint region
      // Beware of deadlocks such as 6317397.
      // The per-thread Parker:: mutex is a classic leaf-lock.
      // In particular a thread must never block on the Threads_lock while
      // holding the Parker:: mutex.  If safepoints are pending both the
      // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
      ThreadBlockInVM tbivm(jt);
    
      // Don't wait if cannot get lock since interference arises from
      // unblocking.  Also. check interrupt before trying wait
      if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
        return;
      }
    
      int status ;
      if (_counter > 0)  { // no wait needed
        _counter = 0;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        OrderAccess::fence();
        return;
      }
    
    #ifdef ASSERT
      // Don't catch signals while blocked; let the running threads have the signals.
      // (This allows a debugger to break into the running thread.)
      sigset_t oldsigs;
      sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
      pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
    #endif
    
      OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
    
      if (time == 0) {
        status = pthread_cond_wait (_cond, _mutex) ;
      } else {
        status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ;
        if (status != 0 && WorkAroundNPTLTimedWaitHang) {
          pthread_cond_destroy (_cond) ;
          pthread_cond_init    (_cond, NULL);
        }
      }
      assert_status(status == 0 || status == EINTR ||
                    status == ETIME || status == ETIMEDOUT,
                    status, "cond_timedwait");
    
    #ifdef ASSERT
      pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
    #endif
    
      _counter = 0 ;
      status = pthread_mutex_unlock(_mutex) ;
      assert_status(status == 0, status, "invariant") ;
      // If externally suspended while waiting, re-suspend
      if (jt->handle_special_suspend_equivalent_condition()) {
        jt->java_suspend_self();
      }
    
      OrderAccess::fence();
    }

    如果有可用的许可,也就是_counter>0,那么直接返回,不需要等待。

    然后有一句:

    if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
        return;
      }

    这句注释上写的也有,如果能获取锁才继续往下走,如果都获取不到锁,那么直接返回。前提也会检查线程是否是阻塞状态。翻译过来就是:当前线程必须是中断状态,并且能够加锁成功,那么继续往下走,否则直接返回。

    这里卡了一下,为毛必须是阻塞状态呀,阻塞状态怎么可能代码执行到这里?

    呵呵,基础果然是太差呀,查了下。is_interrupted判断是否是阻塞状态而已,也只是个状态,只是线程的一个状态,线程收到这个状态是否要阻塞自己就不一定了,因此状态时阻塞的状态不代表线程就不能继续执行。至于为什么要是阻塞状态才能继续,暂时还不知道继续往下看...

    紧接着继续查看许可,如果许可>0,那么直接解锁返回。如果不是那么因为LockSupport.lock(false, 0L),时间是0,就进入

    status = pthread_cond_wait (_cond, _mutex) ;

    这段代码,看懂了IBM的那三个链接的文章就会知道,POSIX线程执行pthread_cond_wait,之前必须获取mutex锁,因此上边先执行pthread_mutex_trylock 获取锁,然后才能执行到这里,然后就是理解pthread_cond_wiat了,这里就会造成线程休眠,等待条件唤醒,休眠前会解锁mutex,这样别的线程才能继续操作,然后等待条件满足后,其他线程唤醒这里pthread_cond_wait返回前会再次获取锁,获取锁成功后才会返回。也就是说这里就进入休眠状态了,实现了LockSupport.park的语义了。

    然后我们再来看LockSupport.unpark的代码:

    void Parker::unpark() {
      int s, status ;
      status = pthread_mutex_lock(_mutex);
      assert (status == 0, "invariant") ;
      s = _counter;
      _counter = 1;
      if (s < 1) {
         if (WorkAroundNPTLTimedWaitHang) {
            status = pthread_cond_signal (_cond) ;
            assert (status == 0, "invariant") ;
            status = pthread_mutex_unlock(_mutex);
            assert (status == 0, "invariant") ;
         } else {
            status = pthread_mutex_unlock(_mutex);
            assert (status == 0, "invariant") ;
            status = pthread_cond_signal (_cond) ;
            assert (status == 0, "invariant") ;
         }
      } else {
        pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
      }
    }

    相对就简单很多了,设置许可值_counter=1,然后解锁如果许可是0,那么再唤醒下休眠线程。

    这样LockSupport.park和unpark的基本代码逻辑就理完了。

    讲真不知道这样看源码是不是浪费时间,希望越看越深入,理解能越好吧。加油。

  • 相关阅读:
    二、有限状态机(FSM)
    一、同步状态机
    quartus ii 中文注释乱码解决办法
    基于FPGA的线阵CCD图像测量系统研究——笔记
    数据接口的同步方法
    Servlet和web服务器关系
    实现项目本地,测试,生产3套环境
    Tomcat--startup.bat文件
    Servlet--HttpUtils类
    Servlet--HttpSessionBindingListener接口,HttpSessionBindingEvent类
  • 原文地址:https://www.cnblogs.com/aquariusm/p/9267480.html
Copyright © 2011-2022 走看看