zoukankan      html  css  js  c++  java
  • CountDownLatch

    1:CountDownLatch

    CountDownLatch countDownLatch = new CountDownLatch(1);

    new Thread(()->{
      try {
          Thread.sleep(10000L);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      countDownLatch.countDown();
    }).start();

    基于AQS实现的功能。

    1.1 初始化

    设置CountDownLatch的数量,本质上就是设置state的值,state是volatile关键字修饰的字段。

    public CountDownLatch(int count) {
      if (count < 0) throw new IllegalArgumentException("count < 0");
      this.sync = new Sync(count);
    }
    Sync(int count) {
      setState(count);
    }

    1.2 countDown()方法

    进行释放功能

    public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
    }
    1.2.1 tryReleaseShared(arg)

    当调用一次CountLatch(1),state值会减1。当state的值为0的时候,返回true。

    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
          int c = getState();
          if (c == 0)
              return false;
          int nextc = c-1;
          if (compareAndSetState(c, nextc))
              return nextc == 0;
      }
    }
    1.2.2 doReleaseShared();

    当countDownLatch的state值降为0,需要唤醒await线程,阻塞线程也是一个双向链表。该方法每一次调用都会unparkSuccessor(h),唤醒头结点后面节点的阻塞。

    如果多个线程被阻塞,唤醒流程在后续进行分析。

    private void doReleaseShared() {
      for (;;) {
          Node h = head;
          if (h != null && h != tail) {
              int ws = h.waitStatus;
              if (ws == Node.SIGNAL) {
                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                      continue;           // loop to recheck cases
                  unparkSuccessor(h);
              }
              else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                  continue;               // loop on failed CAS
          }
          if (h == head)                   // loop if head changed
              break;
      }
    }
    1.2.3 unparkSuccessor(h);

    进行链表首节点进行释放。

    private void unparkSuccessor(Node node) {
      /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling. It is OK if this
        * fails or if status is changed by waiting thread.
        */
      int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);

      /*
        * Thread to unpark is held in successor, which is normally
        * just the next node. But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          LockSupport.unpark(s.thread);
    }
     

    1.3 await()方法

    如果tryAcquireShared(arg) < 0,标识state 不等于0,需要进行阻塞。

    public final void acquireSharedInterruptibly(int arg)
          throws InterruptedException {
      if (Thread.interrupted())
          throw new InterruptedException();
      if (tryAcquireShared(arg) < 0)
          doAcquireSharedInterruptibly(arg);
    }
    1.3.1 doAcquireSharedInterruptibly(arg)
    private void doAcquireSharedInterruptibly(int arg)
      throws InterruptedException {
      //当前节点添加列表
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
          for (;;) {
          //获取当前节点的前一个节点
              final Node p = node.predecessor();
              if (p == head) {
              //获取state值是否为0,等于0则r=1
                  int r = tryAcquireShared(arg);
                  if (r >= 0) {
                  //关键操作
                      setHeadAndPropagate(node, r);
                      p.next = null; // help GC
                      failed = false;
                      return;
                  }
              }
              //前面已写。设置pre的wait_status值,当前线程进行park
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  throw new InterruptedException();
          }
      } finally {
          if (failed)
              cancelAcquire(node);
      }
    }
    1.3.2 setHeadAndPropagate(node, r)

    进行头结点设置,并唤醒下一个被阻塞的wait。

    private void setHeadAndPropagate(Node node, int propagate) {
      Node h = head; // Record old head for check below
      setHead(node);
       
      if (propagate > 0 || h == null || h.waitStatus < 0 ||
          (h = head) == null || h.waitStatus < 0) {
          Node s = node.next;
          if (s == null || s.isShared())
              doReleaseShared();
      }
    }
    唤醒机制:先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去 最终所有线程都会被唤醒,其实这也是AQS共享锁的唤醒原理。

    2:CyclicBarrier

    使用代码:

    public static void main(String[] args) throws InterruptedException {
          CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
              @Override
              public void run() {
                  System.out.println("线程组执行结束");
              }
          });
          for (int i = 0; i < 5; i++) {
              new Thread(new readNum(i,cyclicBarrier)).start();
          }
    //       CyclicBarrier 可以重复利用,
    //         这个是CountDownLatch做不到的
          for (int i = 11; i < 16; i++) {
              new Thread(new readNum(i,cyclicBarrier)).start();
          }
      }
      static class readNum implements Runnable{
          private int id;
          private CyclicBarrier cyc;
          public readNum(int id,CyclicBarrier cyc){
              this.id = id;
              this.cyc = cyc;
          }
          @Override
          public void run() {
              synchronized (this){
                  System.out.println("id:"+id);
                  try {
                      cyc.await();
                      System.out.println("线程组任务" + id + "结束,其他任务继续");
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      }

    2.1 await()方法

    public int await() throws InterruptedException, BrokenBarrierException {
      try {
          return dowait(false, 0L);
      } catch (TimeoutException toe) {
          throw new Error(toe); // cannot happen
      }
    }
    2.2 dowait(boolean timed, long nanos)方法

    当前功能主要就是该方法实现:

    private int dowait(boolean timed, long nanos)
      throws InterruptedException, BrokenBarrierException,
              TimeoutException {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
          final Generation g = generation;

          if (g.broken)
              throw new BrokenBarrierException();

          if (Thread.interrupted()) {
              breakBarrier();
              throw new InterruptedException();
          }
    //线程运行到此会减count值;
          int index = --count;
          //如果所有线程已执行await(),方法会执行线程。
          if (index == 0) { // tripped
              boolean ranAction = false;
              try {
                  final Runnable command = barrierCommand;
                  if (command != null)
                      command.run(); //没有start,而是使用当前线程执行方法,不用新创建线程
                  ranAction = true;
                  nextGeneration();
                  return 0;
              } finally {
                  if (!ranAction)
                      breakBarrier();
              }
          }

          // loop until tripped, broken, interrupted, or timed out
          for (;;) {
              try {
                  if (!timed)
                  //当前线程阻塞,使用的是条件锁
                      trip.await();
                  else if (nanos > 0L)
                      nanos = trip.awaitNanos(nanos);
              } catch (InterruptedException ie) {
                  if (g == generation && ! g.broken) {
                      breakBarrier();
                      throw ie;
                  } else {
                      // We're about to finish waiting even if we had not
                      // been interrupted, so this interrupt is deemed to
                      // "belong" to subsequent execution.
                      Thread.currentThread().interrupt();
                  }
              }

              if (g.broken)
                  throw new BrokenBarrierException();

              if (g != generation)
                  return index;

              if (timed && nanos <= 0L) {
                  breakBarrier();
                  throw new TimeoutException();
              }
          }
      } finally {
          lock.unlock();
      }
    }

    (1):使用到ReentrantLock lock来保证线程安全;

    (2):nextGeneration();会唤醒所有的线程,并且重置count值和generation,可以等待下次调用。

    private void nextGeneration() {
      // signal completion of last generation
      trip.signalAll();
      // set up next generation
      count = parties;
      generation = new Generation();
    }

     

  • 相关阅读:
    多表联合查询,利用 concat 模糊搜索
    order by 中利用 case when 排序
    Quartz.NET 3.0.7 + MySql 动态调度作业+动态切换版本+多作业引用同一程序集不同版本+持久化+集群(一)
    ASP.NET Core 2.2 基础知识(十八) 托管和部署 概述
    ASP.NET Core 2.2 基础知识(十七) SignalR 一个极其简陋的聊天室
    ASP.NET Core 2.2 基础知识(十六) SignalR 概述
    ASP.NET Core 2.2 基础知识(十五) Swagger
    ASP.NET Core 2.2 基础知识(十四) WebAPI Action返回类型(未完待续)
    linux磁盘管理 磁盘查看操作
    linux磁盘管理 文件挂载
  • 原文地址:https://www.cnblogs.com/mayang2465/p/14653201.html
Copyright © 2011-2022 走看看