zoukankan      html  css  js  c++  java
  • 并发编程之 线程协作工具类

    前言

    在并发编程的时候,Doug Lea 大师为我们准备了很多的工具,都在 JDK 1.5 版本后的java.util.concurrent 包下,今天楼主就和大家分享一些常用的线程协作的工具。

    1. Semaphore 信号量
    2. CountDownLatch 倒计时器
    3. CyclicBarrier 循环栅栏
    4. Exchanger 交换器

    1. Semaphore 信号量

    我们在上一篇文章中说到了3把锁,无论是 synchronized 还是 重入锁还是读写锁,一次只能允许一个线程进行访问。当然这是为了保证线程的安全。但是,如果我们有的时候想一次让多个线程访问同一个代码呢?并且指定线程数量。

    在 JDK 1.5 中,doug lea 大师已经为我们写好了这个工具类,什么呢?就是 Semephore,信号量。信号量为多线程协作提供了更为强大的控制方法。从某种程度上说:信号量是对锁的扩展。信号量可以指定多个线程,同时访问某一个资源。

    该类有2个构造函数:

    
       public Semaphore(int permits) 
    
       public Semaphore(int permits, boolean fair) 
    
    

    permits 表示的是信号量的数量,简单点说就是指定了同时又多少个线程可以同时访问某一个资源。而 fair 参数表示的是否是公平的。那么信号量还有哪些方法呢?

    下面是阻塞方法,也就是会无限等待的方法:

    1. public void acquire() throws InterruptedException { } //获取一个许可
    2. public void acquire(int permits) throws InterruptedException { } //获取permits个许可
    3. public void release() { } //释放一个许可
    4. public void release(int permits) { } //释放permits个许可

    下面是非阻塞方法:

    1. public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    2. public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    3. public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

    如何使用呢?我们还是来个例子吧:

    public class SemaphoreTest implements Runnable {
    
      // 需要指定信号量的准入数,相当于指定了同时有多少个线程可以同时访问某一个资源
      final Semaphore semaphore = new Semaphore(5);
    
      @Override
      public void run() {
        try {
          semaphore.acquire();
          // 模拟耗时操作
          Thread.sleep(2000);
          System.out.println(Thread.currentThread().getId() + ":done!");
          semaphore.release();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemaphoreTest test = new SemaphoreTest();
        for (int i = 0; i < 20; i++) {
          exec.execute(test);
        }
    
    
      }
    }
    

    在上面的代码中,我们指定了可以有5个信号量的实例,在线程池中被20个线程执行,打印的结果都是5个一组,5个一组,说明,的确是每5个线程同时访问该段代码。

    其实信号量就是一种限制策略,在 web 服务器中,信号量就是一种限流策略,限制多少线程执行,和这个模式差不多。

    2. CountDownLatch 倒计时器

    从名字上来看,可以翻译成倒计时门闩,但我们其实不必管门闩,他其实就是个倒计时器。门闩的含义是什么呢?把们锁起来,不让里面的线程跑出来,因此,这个工具常用来控制线程等待,有点像我们的 join 方法,可以让某一个线程等待知道倒计时结束,再开始执行。

    CountDownLatch 的构造函数:

    public CountDownLatch(int count) 其中 int 类型的参数表示当前这个计时器的计数个数。

    我们还是直接来个例子吧:

    /**
     * 相当于join功能,让调用  await 方法的线程等待 countdownlatch 的线程执行完毕
     */
    public class CountDownLatchTest implements Runnable {
    
      /**
       * 表示需要10个线程完成任务,等待在倒计时上的线程才能继续运行。
       */
      static final CountDownLatch end = new CountDownLatch(10);
      static final CountDownLatchTest test = new CountDownLatchTest();
    
      @Override
      public void run() {
        try {
          // 模拟检查任务
          Thread.sleep(new Random().nextInt(10) * 1000);
          System.out.println("check complete");
          end.countDown();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
          exec.submit(test);
        }
    
        end.await();
    
        System.out.println("Fire");
    
        exec.shutdown();
      }
    }
    
    

    我们模拟了火箭发射的场景,火箭发射前,都需要做一些检查任务,等到所有的检查任务完成了才能反射。那我们这里怎么实现的呢?开启20个线程执行 检查任务,注意,任务中,调用了 CountdownLatch 的 countDown 方法,就是倒计时方法,每个线程执行到这里,都会让该倒计时减一,直到为0.

    而再完美的main 线程中,有一行则是 await 方法,该方法让 main 线程等待 countdown 的线程都执行完毕。当20个线程全都成功调用了 run 方法,并且调用了 countdown 的 countdown 方法,countdown 此时为0,main 线程就可以执行 “发射” 了。所以该方法的使用就是让 调用 await 方法的线程等待 调用 countdown 方法的线程,和 join 很相似,join 是调用方等待被调用方。

    3. CyclicBarrier 循环栅栏

    循环栅栏也是控制多线程并发的工具。和 CountDownLatch 非常类似,但是比 CountdDownLatch 复杂,强大。

    看名字也很奇怪,CyclicBarrier,循环栅栏。栅栏是用来拦住别人不要进来的,在我们这里,其实就是拦住线程不要进来,而且可以循环使用。

    加入有一个场景:司令下达命令,要求10个士兵一起去完成一项任务,这是,就会要求10个士兵先集合报道,接着,再一起去执行任务,当10个士兵的任务都完成了,司令对外宣布,任务完成。

    我们是不是想到了使用 coundownLatch 来完成,注意,我们这里需要两次计数,而 countdown 是无法实现的,这就是循环栅栏比倒计时器强大的地方-----可以循环。

    如何使用呢?

    CyclicBarrier 循环栅栏提供了2 个构造方法:

    public CyclicBarrier(int parties) int 类型表示计数的数量

    public CyclicBarrier(int parties, Runnable barrierAction) Runnable 表示每次计数结束需要执行的任务(执行一次)

    我们就各个司令士兵的例子写一段代码:

    public class CyclicBarrierDemo {
    
      static class Soldier implements Runnable {
    
        // 军人
        String soldier;
        // 循环栅栏
        final CyclicBarrier cyclic;
    
        public Soldier(CyclicBarrier cyclic, String soldier) {
          this.cyclic = cyclic;
          this.soldier = soldier;
        }
    
        @Override
        public void run() {
          // 等待所有士兵到齐
          try {
            System.out.println("准备");
            cyclic.await();// 到了 10 才开始走,否则线程等待
    
            doWork();
            // 等待所有士兵完成工作
            cyclic.await();
          } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
          }
    
        }
    
        private void doWork() {
          try {
            Thread.sleep(Math.abs(new Random().nextInt() % 10000));
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println(soldier + ": 任务完成");
        }
      }
    
      /**
       * 当计数器一次计数完成后,系统会派一个线程执行的这个线程的run方法。
       */
      static class BarrierRun implements Runnable {
    
        boolean flag;
        int N;
    
        public BarrierRun(boolean flag, int n) {
          this.flag = flag;
          N = n;
        }
    
        @Override
        public void run() {
          if (flag) {
            System.out.println("司令:【士兵 " + N + "个, 任务完成!】");
          } else {
            System.out.println("司令:【士兵 " + N + "个, 集合完毕!】");
            flag = true;
          }
        }
      }
    
    
      public static void main(String[] args) {
        final int n = 10;
    
        Thread[] allSoldier = new Thread[n];
        boolean flag = false;
        // parties 表示计数总数,也就是参与的线程总数, barrierAction 就是当计数器一次计数完成后,系统会执行的动作
        CyclicBarrier cyclic = new CyclicBarrier(n, new BarrierRun(false, n));
    
        // 设置屏障点,主要是为了执行这个方法
        System.out.println("集合队伍");
        for (int i = 0; i < n; i++) {
          System.out.println("士兵" + i + "报道");
          allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
          allSoldier[i].start();
    //      if (i== 5){ // 会导致所有的线程全部停止 BrokenBarrierException * 9 + InterruptedException * 1
    //        allSoldier[i].interrupt();
    //
    //      }
        }
      }
    
    }
    
    

    代码不少,我们场景10个士兵线程,每个士兵线程都含有同一个循环栅栏,再士兵调用run方法的时候,需要调用循环栅栏的 await 方法,此时,线程就开始等待,直到栅栏的数字变成了0,因为我们在创建的时候设置的是10,因此,需要10个线程触发此方法,当10个线程全部都触发了该方法,也就是计数器归零了,注意,此时循环栅栏会随机调用一个线程执行栅栏处发生的行动。就是我们的 BarrierRun 任务。在执行完该任务后,所有的士兵执行 doWork 方法,下面又开始 执行栅栏的 await 方法,所有的士兵又开始等待,等待所有的士兵都执行完毕,可以看到,该栅栏被循环使用了,而 countdown 是做不到的。等到所有的士兵都是调用了 await 方法,循环栅栏再次随机抽取一个线程调用 BarrierRun 的 run 方法。最后完成了所有的任务。

    可以说,CyclicBarrier 和 CountDownLatch 还有 Samephore 类都是协作多个线程同时工作的工具。什么时候使用什么工具,各位可以自己思考。

    只需要记住:countDown 和 CyclicBarrier 很相似,但不能循环,而 Samephore 可以控制每次又多少个线程进入某个代码块。相当于多线程的锁。具体使用场景自己看。

    4. Exchanger 交换器

    Exchanger 是要给交换器,也是用于线程间协作的工具类。什么用处呢?假如现在有一个需求,需要你将两个线程的数据进行狡猾,你该怎么做?

    我猜测大家肯定使用类似 wait notify 之类的方法进行线程之间的通信,或者使用消息机制。但 Doug Lea 为我们提供另外一种选择:交换器,直接交换两个线程的数据。6不6?

    两个线程可以通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,他会一直等待第二个线程也执行 exchange 方法, 当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

    写个例子大家看看:

    public class ExchangerDemo {
    
      static final Exchanger<String> exgr = new Exchanger<>();
    
      static ExecutorService threadPool = Executors.newFixedThreadPool(2);
    
      public static void main(String[] args) {
        threadPool.execute(new Runnable() {
          @Override
          public void run() {
            try {
              String a = "银行流水A";
              String b = exgr.exchange(a);
              System.err.println("A 和 B 数据是否一致: " + a.equals(b) + ", A 录入的是:" + a + ", B 录入的是:" + b);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });
    
        threadPool.execute(new Runnable() {
          @Override
          public void run() {
            try {
              String b = "银行流水B";
              String a = exgr.exchange(b);
              System.out.println("A 和 B 数据是否一致: " + a.equals(b) + ", A 录入的是:" + a + ", B 录入的是:" + b);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });
    
        threadPool.shutdown();
      }
    

    执行结果

    执行结果

    可以看到,两个线程都得到了对方的数据,可以说非常的牛逼。如果两个线程有一个没有执行 exchange 方法,另一个则会一直等待,如果担心 exchanger 时间过长,可以设置过长时间 exchange(V x, long timeout, TimeUnit unit)。

    总结

    好了,我们今天介绍了 java.util.concurrent 包下的4个多线程协作工具类,让我们在今后并发编程中可以有更顺手的工具,有些业务场景完全可以使用这些现成的工具。比如 Samephore,CountDownLatch, CyclicBarrier,Exchanger,每个工具都有自己的应用场景。

    好了,今天的并发工具使用就介绍到这里。

    good luck !!!!

  • 相关阅读:
    赫尔维茨公式
    从解析几何的角度分析二次型
    Struts 1 Struts 2
    记一次服务器被入侵的调查取证
    契约式设计 契约式编程 Design by contract
    lsblk df
    Linux Find Out Last System Reboot Time and Date Command 登录安全 开关机 记录 帐号审计 历史记录命令条数
    Infrastructure for container projects.
    更新文档 版本控制 多版本并发控制
    Building Microservices: Using an API Gateway
  • 原文地址:https://www.cnblogs.com/stateis0/p/9061633.html
Copyright © 2011-2022 走看看