zoukankan      html  css  js  c++  java
  • 线程工具类

                                                      同步并发工具类

    同步屏障CyclicBarrier

    简介:CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

     1 package com.cattsoft.sync;
     2 
     3 import java.util.concurrent.CyclicBarrier;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 
     7 /**
     8  * 表示大家彼此等待,集合后才开始出发(张家界旅游,早上等车,大峡谷进去集合(范乾),大峡谷出来集合)
     9  * 工作中没准就有这种应用,当以后的工作中做什么项目,可能用到这个技术,就要联想起来
    10  * 理论上是一定有这种需求,jdk开发者才会搞这种技术,只是目前还没碰到。
    11  */
    12 public class CyclicBarrierTest {
    13 
    14     public static void main(String[] args) {
    15         ExecutorService executor = Executors.newCachedThreadPool();
    16         final CyclicBarrier cb = new CyclicBarrier(15);
    17         for (int i = 0; i <15; i++) {
    18             Runnable runnable = new Runnable() {
    19                 @Override
    20                 public void run() {
    21                     try {
    22                         // 获取一个信号灯
    23                         Thread.sleep((long) (Math.random() * 10000));
    24                         System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址1,当前已有"
    25                                 + (cb.getNumberWaiting() + 1) + "个已经到达。"
    26                                 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候"));
    27                         cb.await(); // 在这里集合
    28                         
    29                         Thread.sleep((long) (Math.random() * 10000));
    30                         System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址2,当前已有"
    31                                 + (cb.getNumberWaiting() + 1) + "个已经到达。"
    32                                 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候"));
    33                         cb.await(); // 在这里集合
    34                         
    35                         Thread.sleep((long) (Math.random() * 10000));
    36                         System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址3,当前已有"
    37                                 + (cb.getNumberWaiting() + 1) + "个已经到达。"
    38                                 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候"));
    39                         cb.await(); // 在这里集合
    40                     } catch (Exception e) {
    41                         e.printStackTrace();
    42                     }
    43                 }
    44             };
    45             executor.execute(runnable);
    46         }
    47         executor.shutdown();
    48     }
    49 }

    如果把new CyclicBarrier(15)修改成new CyclicBarrier(16)则主线程和子线程会永远等待,因为没有第16个线程执行await方法,即没有第16个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:

     1 public class CyclicBarrierTest2 {
     2 
     3     static CyclicBarrier c = new CyclicBarrier(2, new A());
     4 
     5     public static void main(String[] args) {
     6         new Thread(new Runnable() {
     7 
     8             @Override
     9             public void run() {
    10                 try {
    11                     c.await();
    12                 } catch (Exception e) {
    13 
    14                 }
    15                 System.out.println(1);
    16             }
    17         }).start();
    18 
    19         try {
    20             c.await();
    21         } catch (Exception e) {
    22 
    23         }
    24         System.out.println(2);
    25     }
    26 
    27     static class A implements Runnable {
    28 
    29         @Override
    30         public void run() {
    31             System.out.println(3);
    32         }
    33 
    34     }
    35 
    36 }

    输出:

    1 3
    2 1
    3 2

    CyclicBarrier的应用场景

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

    同步信号量Semaphore

    控制并发线程数

    Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。

    以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。
    在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。
    更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。
    在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能排在队列的头部。
     1 package com.cattsoft.sync;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 import java.util.concurrent.Semaphore;
     6 
     7 /**
     8  * 信号灯同步工具
     9  * 与互斥锁对比,信号灯可以由其它线程释放,互斥锁只能由自己释放
    10  * 模拟场景:现在有3个厕所坑,有5个人要进去,先进去3个人,其它2个等待,有人出来,等待的人再进去
    11  * <p>
    12  * TTP:
    13  * 知识很容易就学会,最大的困难是在以后的工作中遇到困难了,正好能想到用这个工具
    14  * 每个药能治什么病,很容易学会,难就难在当看到一个病人以后,能想起来就这个药,能想到这一步你就是华佗。
    15  */
    16 public class SemaphoreTest {
    17     public static void main(String[] args) {
    18         ExecutorService executor = Executors.newFixedThreadPool(5);
    19 
    20         //可使用的公共资源数量
    21         final Semaphore sp = new Semaphore(3);
    22 
    23         for (int i = 0; i < 5; i++) {
    24             Runnable runnable = new Runnable() {
    25                 @Override
    26                 public void run() {
    27                     try {
    28                         // 获取一个信号灯
    29                         sp.acquire();
    30                     } catch (InterruptedException e) {
    31                         e.printStackTrace();
    32                     }
    33                     System.out.println("线程: " + Thread.currentThread().getName() + "进入,当前已有"
    34                             + (3 - sp.availablePermits()) + "个并发。");
    35                     try {
    36                         Thread.sleep((long) (Math.random() * 10000));
    37                     } catch (InterruptedException e) {
    38                         e.printStackTrace();
    39                     }
    40                     System.out.println("线程" + Thread.currentThread().getName() + "即将离开。");
    41                     sp.release();
    42                     System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有"
    43                             + (3 - sp.availablePermits()) + "个并发");
    44                 }
    45             };
    46             executor.execute(runnable);
    47         }
    48         executor.shutdown();
    49     }
    50 }

    CountDownLatch

    等待多线程完成,CountDownLatch 允许一个或多个线程等待其他线程完成操作。

    应用场景

    假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join。代码如下:

     1 public class JoinCountDownLatchTest {
     2 
     3     public static void main(String[] args) throws InterruptedException {
     4         Thread parser1 = new Thread(new Runnable() {
     5             @Override
     6             public void run() {
     7             }
     8         });
     9 
    10         Thread parser2 = new Thread(new Runnable() {
    11             @Override
    12             public void run() {
    13                 System.out.println("parser2 finish");
    14             }
    15         });
    16 
    17         parser1.start();
    18         parser2.start();
    19         parser1.join();
    20         parser2.join();
    21         System.out.println("all parser finish");
    22     }
    23 
    24 }
     1 package com.cattsoft.sync;
     2 
     3 import java.util.concurrent.CountDownLatch;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 
     7 /**
     8  * 倒计时计数器
     9  * 模拟场景:
    10  *  三个运动员跑步比赛,听到一听哨响,开始跑,全部跑到了,一起公布成绩。
    11  */
    12 public class CoutDownLatchTest {
    13     public static void main(String[] args) {
    14         ExecutorService executor = Executors.newCachedThreadPool();
    15         final CountDownLatch cdOrder = new CountDownLatch(1);
    16         final CountDownLatch cdAnswer = new CountDownLatch(3);
    17 
    18         for (int i = 0; i < 3; i++) {
    19             Runnable runnable = new Runnable() {
    20                 @Override
    21                 public void run() {
    22                     try {
    23                         System.out.println("线程"+ Thread.currentThread().getName() + "正在准备接受命令");
    24                         cdOrder.await();
    25                         System.out.println("线程" + Thread.currentThread().getName() + "已经接受命令");
    26 
    27                         Thread.sleep((long) (Math.random() * 10000));
    28                         System.out.println("线程"+ Thread.currentThread().getName() + "回应此命令的处理结果");
    29                         cdAnswer.countDown();
    30 
    31                     } catch (Exception e) {
    32                         e.printStackTrace();
    33                     }
    34                 }
    35             };
    36             executor.execute(runnable);
    37         }
    38 
    39         try {
    40             Thread.sleep((long) (Math.random() * 10000));
    41             System.out.println("线程"+ Thread.currentThread().getName() + "即将发布命令");
    42             cdOrder.countDown();
    43             System.out.println("线程"+ Thread.currentThread().getName() + "已经发送命令,正在等待结果");
    44             cdAnswer.await();
    45             System.out.println("线程"+ Thread.currentThread().getName() + "已收到所有响应结果");
    46         } catch (Exception e) {
    47             e.printStackTrace();
    48         }
    49         executor.shutdown();
    50     }
    51 }
    52 
    53 
    54   Map<String,Integer> linkMap = new LinkedHashMap<>();
    55              for(Entry<String,Integer> newEntry :result){
    56                  linkMap.put(newEntry.getKey(), newEntry.getValue());            
    57              }
    58              for(Map.Entry<String, Integer> mapEntry : linkMap.entrySet()){
    59                  System.out.println("key:"+mapEntry.getKey()+"  value:"+mapEntry.getValue());
    60              }
    61          } 

    CyclicBarrier和CountDownLatch的区别

    • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

    isBroken的使用代码如下:

     1 import java.util.concurrent.BrokenBarrierException;
     2 import java.util.concurrent.CyclicBarrier;
     3 
     4 public class CyclicBarrierTest3 {
     5 
     6     static CyclicBarrier c = new CyclicBarrier(2);
     7 
     8     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
     9         Thread thread = new Thread(new Runnable() {
    10 
    11             @Override
    12             public void run() {
    13                 try {
    14                     c.await();
    15                 } catch (Exception e) {
    16                 }
    17             }
    18         });
    19         thread.start();
    20         thread.interrupt();
    21         try {
    22             c.await();
    23         } catch (Exception e) {
    24             System.out.println(c.isBroken());
    25         }
    26     }
    27 }
  • 相关阅读:
    [C# 基础知识系列]专题六:泛型基础篇——为什么引入泛型
    [C#基础知识系列]专题十七:深入理解动态类型
    [C# 基础知识系列]专题九: 深入理解泛型可变性
    C#网络编程系列文章索引
    [C#基础知识系列]专题十:全面解析可空类型
    [C# 基础知识系列]专题十一:匿名方法解析
    [C# 基础知识系列]专题十六:Linq介绍
    VSTO之旅系列(一):VSTO入门
    [C# 网络编程系列]专题七:UDP编程补充——UDP广播程序的实现
    [C# 网络编程系列]专题四:自定义Web浏览器
  • 原文地址:https://www.cnblogs.com/tangjiang-code/p/9867512.html
Copyright © 2011-2022 走看看