zoukankan      html  css  js  c++  java
  • CountDownLatch

    1:CountDownLatch

    CountDownLatch countDownLatch = new CountDownLatch(1);

    new Thread(()->{
      try {
          Thread.sleep(10000L);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      countDownLatch.countDown();
    }).start();

    基于AQS实现的功能。

    1.1 初始化

    设置CountDownLatch的数量,本质上就是设置state的值,state是volatile关键字修饰的字段。

    public CountDownLatch(int count) {
      if (count < 0) throw new IllegalArgumentException("count < 0");
      this.sync = new Sync(count);
    }
    Sync(int count) {
      setState(count);
    }

    1.2 countDown()方法

    进行释放功能

    public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
    }
    1.2.1 tryReleaseShared(arg)

    当调用一次CountLatch(1),state值会减1。当state的值为0的时候,返回true。

    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
          int c = getState();
          if (c == 0)
              return false;
          int nextc = c-1;
          if (compareAndSetState(c, nextc))
              return nextc == 0;
      }
    }
    1.2.2 doReleaseShared();

    当countDownLatch的state值降为0,需要唤醒await线程,阻塞线程也是一个双向链表。该方法每一次调用都会unparkSuccessor(h),唤醒头结点后面节点的阻塞。

    如果多个线程被阻塞,唤醒流程在后续进行分析。

    private void doReleaseShared() {
      for (;;) {
          Node h = head;
          if (h != null && h != tail) {
              int ws = h.waitStatus;
              if (ws == Node.SIGNAL) {
                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                      continue;           // loop to recheck cases
                  unparkSuccessor(h);
              }
              else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                  continue;               // loop on failed CAS
          }
          if (h == head)                   // loop if head changed
              break;
      }
    }
    1.2.3 unparkSuccessor(h);

    进行链表首节点进行释放。

    private void unparkSuccessor(Node node) {
      /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling. It is OK if this
        * fails or if status is changed by waiting thread.
        */
      int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);

      /*
        * Thread to unpark is held in successor, which is normally
        * just the next node. But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          LockSupport.unpark(s.thread);
    }
     

    1.3 await()方法

    如果tryAcquireShared(arg) < 0,标识state 不等于0,需要进行阻塞。

    public final void acquireSharedInterruptibly(int arg)
          throws InterruptedException {
      if (Thread.interrupted())
          throw new InterruptedException();
      if (tryAcquireShared(arg) < 0)
          doAcquireSharedInterruptibly(arg);
    }
    1.3.1 doAcquireSharedInterruptibly(arg)
    private void doAcquireSharedInterruptibly(int arg)
      throws InterruptedException {
      //当前节点添加列表
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
          for (;;) {
          //获取当前节点的前一个节点
              final Node p = node.predecessor();
              if (p == head) {
              //获取state值是否为0,等于0则r=1
                  int r = tryAcquireShared(arg);
                  if (r >= 0) {
                  //关键操作
                      setHeadAndPropagate(node, r);
                      p.next = null; // help GC
                      failed = false;
                      return;
                  }
              }
              //前面已写。设置pre的wait_status值,当前线程进行park
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  throw new InterruptedException();
          }
      } finally {
          if (failed)
              cancelAcquire(node);
      }
    }
    1.3.2 setHeadAndPropagate(node, r)

    进行头结点设置,并唤醒下一个被阻塞的wait。

    private void setHeadAndPropagate(Node node, int propagate) {
      Node h = head; // Record old head for check below
      setHead(node);
       
      if (propagate > 0 || h == null || h.waitStatus < 0 ||
          (h = head) == null || h.waitStatus < 0) {
          Node s = node.next;
          if (s == null || s.isShared())
              doReleaseShared();
      }
    }
    唤醒机制:先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去 最终所有线程都会被唤醒,其实这也是AQS共享锁的唤醒原理。

    2:CyclicBarrier

    使用代码:

    public static void main(String[] args) throws InterruptedException {
          CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
              @Override
              public void run() {
                  System.out.println("线程组执行结束");
              }
          });
          for (int i = 0; i < 5; i++) {
              new Thread(new readNum(i,cyclicBarrier)).start();
          }
    //       CyclicBarrier 可以重复利用,
    //         这个是CountDownLatch做不到的
          for (int i = 11; i < 16; i++) {
              new Thread(new readNum(i,cyclicBarrier)).start();
          }
      }
      static class readNum implements Runnable{
          private int id;
          private CyclicBarrier cyc;
          public readNum(int id,CyclicBarrier cyc){
              this.id = id;
              this.cyc = cyc;
          }
          @Override
          public void run() {
              synchronized (this){
                  System.out.println("id:"+id);
                  try {
                      cyc.await();
                      System.out.println("线程组任务" + id + "结束,其他任务继续");
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      }

    2.1 await()方法

    public int await() throws InterruptedException, BrokenBarrierException {
      try {
          return dowait(false, 0L);
      } catch (TimeoutException toe) {
          throw new Error(toe); // cannot happen
      }
    }
    2.2 dowait(boolean timed, long nanos)方法

    当前功能主要就是该方法实现:

    private int dowait(boolean timed, long nanos)
      throws InterruptedException, BrokenBarrierException,
              TimeoutException {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
          final Generation g = generation;

          if (g.broken)
              throw new BrokenBarrierException();

          if (Thread.interrupted()) {
              breakBarrier();
              throw new InterruptedException();
          }
    //线程运行到此会减count值;
          int index = --count;
          //如果所有线程已执行await(),方法会执行线程。
          if (index == 0) { // tripped
              boolean ranAction = false;
              try {
                  final Runnable command = barrierCommand;
                  if (command != null)
                      command.run(); //没有start,而是使用当前线程执行方法,不用新创建线程
                  ranAction = true;
                  nextGeneration();
                  return 0;
              } finally {
                  if (!ranAction)
                      breakBarrier();
              }
          }

          // loop until tripped, broken, interrupted, or timed out
          for (;;) {
              try {
                  if (!timed)
                  //当前线程阻塞,使用的是条件锁
                      trip.await();
                  else if (nanos > 0L)
                      nanos = trip.awaitNanos(nanos);
              } catch (InterruptedException ie) {
                  if (g == generation && ! g.broken) {
                      breakBarrier();
                      throw ie;
                  } else {
                      // We're about to finish waiting even if we had not
                      // been interrupted, so this interrupt is deemed to
                      // "belong" to subsequent execution.
                      Thread.currentThread().interrupt();
                  }
              }

              if (g.broken)
                  throw new BrokenBarrierException();

              if (g != generation)
                  return index;

              if (timed && nanos <= 0L) {
                  breakBarrier();
                  throw new TimeoutException();
              }
          }
      } finally {
          lock.unlock();
      }
    }

    (1):使用到ReentrantLock lock来保证线程安全;

    (2):nextGeneration();会唤醒所有的线程,并且重置count值和generation,可以等待下次调用。

    private void nextGeneration() {
      // signal completion of last generation
      trip.signalAll();
      // set up next generation
      count = parties;
      generation = new Generation();
    }

     

  • 相关阅读:
    第12课:HTML基础之DOM操作1
    第12课:HTML+CSS的基础用法
    selenium对应三大浏览器(谷歌、火狐、IE)驱动安装
    windows下查看端口是否被占,以及端口被哪个程序占用
    windows下jenkins安装过程中的那些坑
    数据库命令大全
    机器学习总结之逻辑回归Logistic Regression
    Longest Substring Without Repeating Characters
    ffmpeg常见命令
    KNN及其改进算法的python实现
  • 原文地址:https://www.cnblogs.com/mayang2465/p/14653201.html
Copyright © 2011-2022 走看看