zoukankan      html  css  js  c++  java
  • 从零开始了解多线程 之 深入浅出AQS -- 下(Tools&CountDownLatch&CyclicBarrier&Semaphore)

    上一篇文章讲到了AQS各种锁&同步队列的内容,这一次继续会牵扯到AQS与及各种工具类的知识,
    Tools&CountDownLatch&CyclicBarrier&Semaphore原理与应用,Atomic&Unsafe魔法类详解

    1.Semaphore

    Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目
    

    Semaphore使用(基本应用场景: 资源访问,服务限流)

    public Semaphore(int permits)
    public Semaphore(int permits, boolean fair)
        permits 表示许可线程的数量
        fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
       
    public void acquire() throws InterruptedException
    public void release()
    tryAcquire(long timeout, TimeUnit unit)
        acquire() 表示阻塞并获取许可
        release() 表示释放许可
    

    代码实现

    public class SemaphoreSample {
        public static void main(String[] args) {
            // 允许两个资源 同一时间是有两个线程能进来
            Semaphore semaphore = new Semaphore(2);
            for (int i=0;i<5;i++){
                new Thread(new Task(semaphore,"semaphore-test-"+i)).start();
            }
        }
        static class Task extends Thread{
            Semaphore semaphore;
            public Task(Semaphore semaphore,String name){
                this.semaphore = semaphore;
                this.setName(name);
            }
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+":acquire() at time:"+System.currentTimeMillis());
                    Thread.sleep(3000);
                        semaphore.release();
                    System.out.println(Thread.currentThread().getName()+":release() at time:"+System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    semaphore测试图片

    • Semaphore源码解释

      假如当前资源有1,2 ,总共资源有2,这时候线程3进来了 (Semaphore(2) 两个资源被t1,t2获取了,还未释放,t3进来)

    • 创建一个非公平锁

          /**
           * 创建非公平锁
           * @param permits
           */
          public Semaphore(int permits) {
              sync = new NonfairSync(permits);
          }
      
    • 非公平锁实例化

      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = -2694183684443567898L;
      
          /**
           * 创建非公平锁的时候 设置 状态 (调用父类方法)
           *   Sync(int permits) {
           *       setState(permits);
           *   }
           * 上一个文章中说道 permits 在ReentrantLock 中这个代表重入次数,
           * Semaphore 初始化的时候把设置的值传了进来 设置到了 state中 代表初始化状态值(已重入次数?),
           *
           * 查看 semaphore.acquire() 方法 其中调用了 sync.acquireSharedInterruptibly(1) 默认传了一个 1 进来,
           * 代表每次调用acquire方法将获取一个资源
           *
           *
           * 以共享模式获取,如果中断则中止。
           * 首先检查中断状态,然后至少调用一次tryAcquireShared,成功后返回。否则线程已排队,可能会重复阻塞和解除阻塞
           * 调用 tryAcquireShared 直到成功或线程中断。
           *
           *  public final void acquireSharedInterruptibly(int arg)
           *         throws InterruptedException {
           *     if (Thread.interrupted()) {
           *         throw new InterruptedException();
           *     }
           *     if (tryAcquireShared(arg) < 0) {
           *         doAcquireSharedInterruptibly(arg);
           *     }
           *  }
           *
           * @param permits
           */
          NonfairSync(int permits) {
              super(permits);
          }
      
          /**
           * 这个方法是在 AQS抽象类中 doAcquireSharedInterruptibly 调用的之类的方法
           * 
           * 共享式:共享式地获取同步状态。
           * 
           * 对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,
           *    其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean
           *    
           * 对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。(Semaphore中便是这个)
           *
           * 1.当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
           * 
           * 2.当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;
           *
           *3.当返回值小于0时,表示获取同步状态失败。
           *
           * @param acquires
           * @return
           */
          @Override
          protected int tryAcquireShared(int acquires) {
              return nonfairTryAcquireShared(acquires);
          }
      }
      
    • semaphore.acquire() 调用获取资源

       -acquire
       public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
      
      
      -- acquireSharedInterruptibly
       /**
       * 以共享模式获取,如果中断则中止。
       * 首先检查中断状态,然后至少调用一次{@link{tryAcquireShared},成功后返回。否则线程已排队,可能会重复阻塞和解除阻塞,
       * 调用{@link#tryAcquireShared}直到成功或线程中断。
       *
       *  @param arg获取参数。 当前这里代表 获取资源值
       * 你喜欢。
       * @如果当前线程被中断,则引发InterruptedException
       */
      public final void acquireSharedInterruptibly(int arg)
              throws InterruptedException {
          if (Thread.interrupted()) {
              throw new InterruptedException();
          }
          // 尝试去获取资源(这个方法具体实现再具体类中,公平锁和非公平锁里面的实现逻辑不通)< 0 表示获取失败,
          // 获取失败再继续后面的逻辑 doAcquireSharedInterruptibly
          if (tryAcquireShared(arg) < 0) {
              doAcquireSharedInterruptibly(arg);
          }
      }
      
      -- tryAcquireShared
      @Override
      protected int tryAcquireShared(int acquires) {
          return nonfairTryAcquireShared(acquires);
      }
      
      -- nonfairTryAcquireShared
      /**
       * semaphore中非公平锁获取资源
       * @param acquires
       * @return
       */
      final int nonfairTryAcquireShared(int acquires) {
          // 自旋获取
          for (;;) {
              // 获取当前可用的资源数(最大值是初始化设置好的 permits)
              int available = getState();
              // 获取之后剩余的资源值数
              int remaining = available - acquires;  //( 比如实例代码中 初始值2,获取一次:2-1)
              
              // 当返回值小于0时,表示获取同步状态失败。
              // 当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取
              // 数量不够的话直接获取失败,或者cas算法修改值成功就返回 剩余值
              if (remaining < 0 ||
                      compareAndSetState(available, remaining))
                  return remaining;
          }
      }
      
      -- doAcquireSharedInterruptibly
      
       /**
       * TODO 假如当前资源有1,2 ,总共资源有2,这时候线程3进来了 (Semaphore(2) 两个资源被t1,t2获取了,还未释放,t3进来)
       *
       * 获取 arg 数量的共享资源
       * @param arg the acquire argument
       */
      private void doAcquireSharedInterruptibly(int arg)
              throws InterruptedException {
          // 添加共享等待队列节点 这里面的内容和上一个文章类似 共享方式入队 (上一篇文章中说到的事以独占方式入队)
          final Node node = addWaiter(Node.SHARED);
          boolean failed = true;
          try {
              // 自旋操作直到成功
              for (;;) {
                  //当前节点 获取前一个节点 当前是t3  找到前驱节点(因为t3是首先进队的,所以它的前驱节点是head)
                  final Node p = node.predecessor();
      
                  // 如果前驱节点是头结点 尝试获取资源(就是等待过程中自己是队首的才能去获取资源) (t3是头结点 所以再一次进去回去资源)
                  if (p == head) {
                      // 尝试获取资源 (如果 t1,t2还未释放,这里还是返回-1 或其他的 (失败))
                      int r = tryAcquireShared(arg);
                      if (r >= 0) {
                          // 如果t1或者t2释放了 t3获取资源成功
                          // 把node节点设置成head节点,且Node.waitStatus->Node.PROPAGATE()
                          setHeadAndPropagate(node, r);
                          // help GC
                          p.next = null;
                          failed = false;
                          return;
                      }
                  }
                  // 判断当前节点时候要进行阻塞等待 (上一篇文章中中应该有介绍到),阻塞的话就LockSupport.park(this) 阻塞当前线程
                  // 阻塞后 循环不跑了 需要等待唤醒
                  if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt()) {
                      throw new InterruptedException();
                  }
              }
          } finally {
              if (failed) {
                  cancelAcquire(node);
              }
          }
      }
      
      --setHeadAndPropagate(把node节点设置成head节点,且Node.waitStatus->Node.PROPAGATE)
      
      /**
       * 把node节点设置成head节点,且Node.waitStatus->Node.PROPAGATE()
       */
      private void setHeadAndPropagate(Node node, int propagate) {
      
          //h用来保存旧的head节点
          Node h = head;
      
          //head引用指向node节点
          setHead(node);
      
          /* 这里意思有两种情况是需要执行唤醒操作
           * 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
           * 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
           */
          if (propagate > 0 || h == null || h.waitStatus < 0 ||
                  (h = head) == null || h.waitStatus < 0) {
              Node s = node.next;
              //node是最后一个节点或者 node的后继节点是共享节点
              if (s == null || s.isShared()){
                  /* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
                   * head节点状态为0(第一次添加时是0),
                   * 设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
                   */
                  doReleaseShared();
              }
          }
      }
      
      --doReleaseShared(把当前结点设置为SIGNAL或者PROPAGATE) 
      (资源释放的时候调用到这个,semaphore.acquire()中调用来到这里了 去唤醒其他线程?)
      
      /**
       * 把当前结点设置为SIGNAL或者PROPAGATE
       * 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
       * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
       * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
       */
      private void doReleaseShared() {
          for (;;) {
              Node h = head;
              if (h != null && h != tail) {
                  int ws = h.waitStatus;
                  if (ws == Node.SIGNAL) {//head是SIGNAL状态
                  
                      
                      /* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGATE,
                       * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
                       * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                       */
                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                          continue; //设置失败,重新循环
                      /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
                       * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
                       * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
                       */
                      unparkSuccessor(h);
                      /*
                       * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
                       * 意味着需要将状态向后一个节点传播
                       */
                  }
                  else if (ws == 0 &&
                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                      continue;                // loop on failed CAS
                  }
              }
              if (h == head) //如果head变了,重新循环
              {
                  break;
              }
          }
      }
      
      --shouldParkAfterFailedAcquire (判断当前节点时候要进行阻塞等待)
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          //判断是否应该阻塞
          // 获取前驱节点等待状态
          int ws = pred.waitStatus;
      
          // 此状态是可以被唤醒的 可以去获取锁
          if (ws == Node.SIGNAL)
              /*
               * 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
               */
              return true;
          if (ws > 0) {
              /* 状态是 1 被移除,并且继续检查其他节点(这里循环从后往前一个个节点判断是否需要被移除),如果都是取消状态 一并移除
               * 前驱节点状态如果被取消状态,将被移除出队列
               */
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;
          } else {
              /* 同步队列不会出现 CONDITION
               * 所以 当前驱节点waitStatus为 0 or PROPAGATE(可传递状态)状态时
               *
               * 将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park
               */
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
      }
      
    • semaphore.release() 调用获取资源

      --releaseShared 
      
        public final boolean releaseShared(int arg) {
                
            // 尝试释放资源 释放成功)(更改state成功 才开始去唤醒线程)
            if (tryReleaseShared(arg)) {
                // 去唤醒线程
                doReleaseShared();
                return true;
            }
            return false;
        }
        
      -tryReleaseShared 
        // 尝试释放资源 更改state
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                // 0
                int current = getState();
                // 0+1
                int next = current + releases;
                if (next < current) // overflow
                {
                    throw new Error("Maximum permit count exceeded");
                }
                // 设置状态为 1
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }
        
      -doReleaseShared
      /**
       * 把当前结点设置为SIGNAL或者PROPAGATE
       * 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
       * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
       * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
       */
      private void doReleaseShared() {
          for (;;) {
              Node h = head;
              if (h != null && h != tail) {
      
                  //判断当前状态是不是-1,是的话想要把-1设置成0,没成功的话继续循环
      
                  // 成功的话  去 unparkSuccessor方法判断状态是否<0 (从这个方法进去的已经==0)
                  // unparkSuccessor中获取下一个节点 node s = node.next继续判断状态,从这里进去的方法中s!=null && s.waitStatus!>0
                  // 所以 会执行 LockSupport.unpark(s.thread);//唤醒线程 (有机会去抢占资源)(共享中一个一个唤醒,但是如果状态是被标记为-3(可传递)的话,会把-3的全唤醒)
                  // 这里唤醒后 在方法 doAcquireSharedInterruptibly 挂起的线程继续跑 ,开始继续去尝试获取资源(不一定获取成功:非公平锁)
                  int ws = h.waitStatus;
                  if (ws == Node.SIGNAL) {//head是SIGNAL状态
                      /* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGATE,
                       * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
                       * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                       */
                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                          continue; //设置失败,重新循环
                      /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
                       * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
                       * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
                       */
                      unparkSuccessor(h);
                      /*
                       * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
                       * 意味着需要将状态向后一个节点传播
                       */
                  }
                  else if (ws == 0 &&
                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                      continue;                // loop on failed CAS
                  }
              }
              if (h == head) //如果head变了,重新循环
              {
                  break;
              }
          }
      }
      
      -unparkSuccessor
       /**
       *唤醒线程
       */
      private void unparkSuccessor(Node node) {
          //获取wait状态
          int ws = node.waitStatus;
          if (ws < 0)
              // 将等待状态waitStatus设置为初始值0
              compareAndSetWaitStatus(node, ws, 0);
      
          /**
           * 若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
           * 进行唤醒
           */
          Node s = node.next;
          if (s == null || s.waitStatus > 0) {
              s = null;
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)
                      s = t;
          }
          if (s != null)
              LockSupport.unpark(s.thread);//唤醒线程
      }
      

    AQS同步器-Semaphore

    2.CountDownLatch使用及应用 (不可重用)

    CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。
    例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行
    
    • 工作方式

       CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。
       每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,
       它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务
       
       CountDownLatch.countDown()
       CountDownLatch.await();
      
    • 应用场景例子

       比如陪媳妇去看病。
       医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。
       现在我们是双核,可以同时做这两个事(多线程)。
       假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)。
      
    • 代码演示和解释

       /**
        * CountDown 解释
        * @author njw
        */
       public class CountDownLaunchSample {
           public static void main(String[] args) throws InterruptedException {
               long now = System.currentTimeMillis();
               CountDownLatch countDownLatch = new CountDownLatch(2);
               new Thread(new SeeDoctorTask(countDownLatch)).start();
               new Thread(new QueueTask(countDownLatch)).start();
       
               /**
                * 这里实际调用的也是 acquireSharedInterruptibly
                *   返回 < 0 就去获取资源 就是只有当 CountDownLatch 设置的初始值为0是放心,从这段逻辑看来 state是不会恢复的,
                *   所以 CountDownLatch 不可重用
                *
                *   protected int tryAcquireShared(int acquires) {
                *      return (getState() == 0) ? 1 : -1;
                *   }
                *
                *   如果返回-1 说明state 还没减少到0,调用 doAcquireSharedInterruptibly 把当前线程阻塞挂起
                *   (这个方法这里只调用一次,所以队列里面其实这时候就只有这一个线程等待)
                *
                *   当 countDownLatch.countDown() 执行一次后,释放资源会把这个线程唤醒,
                *   然后线程继续循环(tryAcquireShared)尝试去判断是否已经state变成了0,不是的话再一次挂起当前线程
                *   当其他 线程再一次执行 countDownLatch.countDown() 的时候,这个线程再一次被唤醒,
                *   再去判断,一直知道state=0,然后获取成功 执行 setHeadAndPropagate  最后返回 继续执行后面的逻辑
                */
               countDownLatch.await();
               System.out.println("over,回家 cost:" + (System.currentTimeMillis() - now));
           }
       
           static class SeeDoctorTask implements Runnable {
               private CountDownLatch countDownLatch;
       
               public SeeDoctorTask(CountDownLatch countDownLatch) {
                   this.countDownLatch = countDownLatch;
               }
       
               public void run() {
                   try {
                       System.out.println("开始看医生");
                       Thread.sleep(2000);
                       System.out.println("看医生结束,准备离开病房");
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   } finally {
                       if (countDownLatch != null) {
       
                           /**
                            *  countDownLatch.countDown 实际的调用
                            *  public final boolean releaseShared(int arg) {
                            *      // 判断是否已经变为0了,不是的话一次执行-1操作
                            *      if (tryReleaseShared(arg)) {
                            *          // 释放资源 同时唤醒 countDownLatch.await() 中挂起的线程
                            *          doReleaseShared();
                            *          return true;
                            *      }
                            *      return false;
                            *  }
                            *
                            *
                            *  尝试释放共享资源  如果已经释放到0了,就不再去释放 不然的话一次释放一个
                            *  等这里的 释放到0 的时候
                            * protected boolean tryReleaseShared(int releases) {
                            *     // 当state变成0的时候 唤醒
                            *     for (;;) {
                            *         int c = getState();
                            *         if (c == 0)
                            *             return false;
                            *         int nextc = c-1;
                            *         if (compareAndSetState(c, nextc))
                            *             return nextc == 0;
                            *     }
                            * }
                            */
                           countDownLatch.countDown();
                       }
                   }
               }
           }
           static class QueueTask implements Runnable {
           private CountDownLatch countDownLatch;
           public QueueTask(CountDownLatch countDownLatch) {
               this.countDownLatch = countDownLatch;
           }
           public void run() {
               try {
                   System.out.println("开始在医院药房排队买药....");
                   Thread.sleep(5000);
                   System.out.println("排队成功,可以开始缴费买药");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               } finally {
                   if (countDownLatch != null) {
                       countDownLatch.countDown();
                   }
               }
           }
         }
       }
      

    3. CyclicBarrier使用及应用 (可重用)

        栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,
        屏障才会开门,所有被屏障拦截的线程才会继续运行。
        
        CyclicBarrier默认的构造方法是CyclicBarrier(int parties),parties表示屏障拦截的线程数量,
        每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
        
        cyclicBarrier.await()
    
    • 应用场景例子

       可以用于多线程计算数据,最后合并计算结果的场景。
       例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,
       先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,
       最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
      
    • 代码示例

       /**
        * CyclicBarrier 解释
        * @author njw
        */
       public class CyclicBarrierTest implements Runnable {
           private CyclicBarrier cyclicBarrier;
           private int index ;
           public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) {
               this.cyclicBarrier = cyclicBarrier;
               this.index = index;
           }
           public void run() {
               try {
                   System.out.println("index: " + index);
                   index--;
       
                   /**
                    *
                    */
                   cyclicBarrier.await();
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
           public static void main(String[] args) throws Exception {
               /**
                * 设置了等待完毕后的执行动作 barrierAction
                * 并且保存了 等待次数 parties 用于重用
                * public CyclicBarrier(int parties, Runnable barrierAction) {
                *     if (parties <= 0) throw new IllegalArgumentException();
                *     this.parties = parties;
                *     this.count = parties;
                *     this.barrierCommand = barrierAction;
                * }
                */
               CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
                   public void run() {
                       System.out.println("所有特工到达屏障,准备开始执行秘密任务");
                   }
               });
               for (int i = 0; i < 10; i++) {
                   new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start();
               }
               cyclicBarrier.await();
               System.out.println("全部到达屏障....");
           }
       }
      
    • 核心代码解释 cyclicBarrier.await()

       dowait(false, 0L)
           
        /**
        * Main barrier code, covering the various policies.
        */
       private int dowait(boolean timed, long nanos)
               throws InterruptedException, BrokenBarrierException,
               TimeoutException {
           // 借助 ReentrantLock 加锁
           final ReentrantLock lock = this.lock;
           lock.lock();
           try {
               final Generation g = generation;
               // 初始化的时候是false
               if (g.broken)
                   throw new BrokenBarrierException();
      
               // 判断线程是否已经中断
               if (Thread.interrupted()) {
                   // 唤醒所有 抛出异常
                   breakBarrier();
                   throw new InterruptedException();
               }
      
               int index = --count;
               if (index == 0) {  // tripped
                   // 已经倒数到 0 了
                   boolean ranAction = false;
                   try {
                       // 初始化时如果定了了 这个是外面传进来的任务。barrierAction 到达栅栏后 要执行的任务
                       final Runnable command = barrierCommand;
                       // 任务不为空 执行
                       if (command != null)
                           command.run();
                       // 标记任务已经执行了
                       ranAction = true;
      
                       // 到这里的时候 已经倒数到0了,并且执行了任务 (这里状态重置)
                       // 这里便是它可以重用的原因 这里开启了下一次倒数。
                       // 唤醒这里cyclicBarrier等待的所有线程后,把state设置成了初始值 所以它可以重用 (parties创建的时候设置的值)
                       nextGeneration();
                       return 0;
                   } finally {
                       // 失败的话唤醒,设置中断标记
                       if (!ranAction) {
                           breakBarrier();
                       }
                   }
               }
      
               // loop until tripped, broken, interrupted, or timed out
               for (; ; ) {
                   try {
                       // 当代的时候传进来 false ,!false=true
                       if (!timed)
                           // reentrantLock 条件等待
                           trip.await();
                       else if (nanos > 0L)
                           nanos = trip.awaitNanos(nanos);
                   } catch (InterruptedException ie) {
                       if (g == generation && !g.broken) {
                           breakBarrier();
                           throw ie;
                       } else {
                           // We're about to finish waiting even if we had not
                           // been interrupted, so this interrupt is deemed to
                           // "belong" to subsequent execution.
                           Thread.currentThread().interrupt();
                       }
                   }
      
                   // 中断抛出异常
                   if (g.broken)
                       throw new BrokenBarrierException();
                   if (g != generation)
                       return index;
                   if (timed && nanos <= 0L) {
                       breakBarrier();
                       throw new TimeoutException();
                   }
               }
           } finally {
               lock.unlock();
           }
       }
      

    4. 简单介绍 Executors

        主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单
    
    • 重要方法

       newCachedThreadPool     创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
       newFixedThreadPool      建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
       newScheduledThreadPool  创建一个定长线程池,支持定时及周期性任务执行。
       newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
      

    5. Exchanger (使用比较少)

        当一个线程运行到exchange()方法时会阻塞,另一个线程运行到exchange()时,二者交换数据,然后执行后面的程序
    
    • 示例

       /**
        * 线程间数据交换
        * @author njw
        */
       public class ExchangerTest {
           public static void main(String[] args) {
               final Exchanger<Integer> exchanger = new Exchanger<Integer>();
               for (int i = 0; i < 10; i++) {
                   final Integer num = i;
                   new Thread() {
                       public void run() {
                           System.out.println("我是线程:Thread_" + this.getName() + "我的数据是:" + num);
                           try {
                               Integer exchangeNum = exchanger.exchange(num);
                               Thread.sleep(1000);
                               System.out.println("我是线程:Thread_" + this.getName() + "我原先的数据为:" + num + " , 交换后的数据为:" + exchangeNum);
                           } catch (InterruptedException e) {
                               e.printStackTrace();
                           }
                       }
                   }.start();
               }
           }
       }
      

    Exchanger测试

  • 相关阅读:
    如何删除一个CSDN上自己上传的资源
    ubuntu 安装 boost
    C#-提取网页中的超链接
    数组地址详解
    约瑟夫环-源码
    树的基础概念(二)
    二叉树的主要操作
    二叉树的简介及链式结构实现
    树的基础概念
    栈实现数的进制转换
  • 原文地址:https://www.cnblogs.com/ningjianwen/p/14164362.html
Copyright © 2011-2022 走看看