zoukankan      html  css  js  c++  java
  • java并发之同步辅助类(Semphore、CountDownLatch、CyclicBarrier、Phaser)

    线程同步辅助类,主要学习两点:

    1、上述几种同步辅助类的作用以及常用的方法
    2、适用场景,如果有适当的场景可以用到,那无疑是最好的

    semaphore(seməˌfôr)

    含义
    信号量就是可以声明多把锁(包括一把锁:此时为互斥信号量)。
    举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着。如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间而是在外面等待。每出来一个人就把钥匙放回原处以方便别人再次进入。
    常用方法
    acquire():获取信号量,信号量内部计数器减1
    release():释放信号量,信号量内部计数器加1
    tryAcquire():这个方法试图获取信号量,如果能够获取返回true,否则返回false
    信号量控制的线程数量在声明时确定。例如:
    Semphore s = new Semphore(2);
     
    一个例子
    实现一个功能:一个打印队列,被三台打印机打印
    public class PrintQueue {
        private Semaphore semaphore;
        private boolean freePrinters[];
        private Lock lockPrinters;
        
        public PrintQueue(){
            semaphore=new Semaphore(3);
            freePrinters=new boolean[3];
            for (int i=0; i<3; i++){
                freePrinters[i]=true;
            }
            lockPrinters=new ReentrantLock();
        }
        
        public void printJob (Object document){
            try {
                semaphore.acquire();
                
                int assignedPrinter=getPrinter();
                
                Long duration=(long)(Math.random()*10);
                System.out.printf("%s: PrintQueue: Printing a Job in Printer %d during %d seconds
    ",Thread.currentThread().getName(),assignedPrinter,duration);
                TimeUnit.SECONDS.sleep(duration);
                
                freePrinters[assignedPrinter]=true;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // Free the semaphore
                semaphore.release();            
            }
        }
        private int getPrinter() {
            int ret=-1;
            
            try {
                lockPrinters.lock();
                for (int i=0; i<freePrinters.length; i++) {
                    if (freePrinters[i]){
                        ret=i;
                        freePrinters[i]=false;
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lockPrinters.unlock();
            }
            return ret;
        }
    }
    声明一个Job类,使用打印队列
     1 public class Job implements Runnable {
     2     private PrintQueue printQueue;
     3     
     4     public Job(PrintQueue printQueue){
     5         this.printQueue=printQueue;
     6     }
     7     
     8     @Override
     9     public void run() {
    10         System.out.printf("%s: Going to print a job
    ",Thread.currentThread().getName());
    11         printQueue.printJob(new Object());
    12         System.out.printf("%s: The document has been printed
    ",Thread.currentThread().getName());        
    13     }
    14 }
    Main方法
    public static void main (String args[]){
            PrintQueue printQueue=new PrintQueue();
            Thread thread[]=new Thread[12];
            for (int i=0; i<12; i++){
                thread[i]=new Thread(new Job(printQueue),"Thread "+i);
            }
            for (int i=0; i<12; i++){
                thread[i].start();
            }
        }
     
    需要注意的地方
    1、对于信号量声明的临界区,虽然可以控制线程访问的数量,但是不能保证代码块之间是线程安全的。所以上面的例子在方法printJob()方法里面使用了锁保证数据安全性。
    2、信号量也涉及到公平性问题。和锁公平性一样,这里默认是非公平的。可以通过构造器显示声明锁的公平性。
    public Semaphore(int permits, boolean fair)
     
    应用场景
    流量控制,即控制能够访问的最大线程数。

    CountDownLatch

    含义
    CountDownLatch可以理解为一个计数器在初始化时设置初始值,当一个线程需要等待某些操作先完成时,需要调用await()方法。这个方法让线程进入休眠状态直到等待的所有线程都执行完成。每调用一次countDown()方法内部计数器减1,直到计数器为0时唤醒。这个可以理解为特殊的CyclicBarrier。线程同步点比较特殊,为内部计数器值为0时开始。
     
    方法
    核心方法两个:countDown()和await()
    countDown():使CountDownLatch维护的内部计数器减1,每个被等待的线程完成的时候调用
    await():线程在执行到CountDownLatch的时候会将此线程置于休眠
     
    例子
    开会的例子:会议室里等与会人员到齐了会议才能开始。
     1 public class VideoConference implements Runnable{
     2     private final CountDownLatch controller;
     3     
     4     public VideoConference(int number) {
     5         controller=new CountDownLatch(number);
     6     }
     7     public void arrive(String name){
     8         System.out.printf("%s has arrived.
    ",name);
     9 
    10         controller.countDown();//调用countDown()方法,使内部计数器减1
    11         System.out.printf("VideoConference: Waiting for %d participants.
    ",controller.getCount());
    12     }
    13     
    14     @Override
    15     public void run() {
    16         System.out.printf("VideoConference: Initialization: %d participants.
    ",controller.getCount());
    17         try {
    18 
    19             controller.await();//等待,直到CoutDownLatch计数器为0
    20 
    21             System.out.printf("VideoConference: All the participants have come
    ");
    22             System.out.printf("VideoConference: Let's start...
    ");
    23         } catch (InterruptedException e) {
    24             e.printStackTrace();
    25         }
    26     }
    27 }
    参加会议人员类
     1 public class Participant implements Runnable {
     2     private VideoConference conference;
     3     
     4     private String name;
     5 
     6     public Participant(VideoConference conference, String name) {
     7         this.conference=conference;
     8         this.name=name;
     9     }
    10     @Override
    11     public void run() {
    12         Long duration=(long)(Math.random()*10);
    13         try {
    14             TimeUnit.SECONDS.sleep(duration);
    15         } catch (InterruptedException e) {
    16             e.printStackTrace();
    17         }    
    18         conference.arrive(name);//每到一个人员,CountDownLatch计数器就减少1
    19     }
    20 }
    主函数
     1 public static void main(String[] args) {
     2         VideoConference conference = new VideoConference(10);
     3         Thread threadConference = new Thread(conference);
     4         threadConference.start();//开启await()方法,在内部计数器为0之前线程处于等待状态
     5         for (int i = 0; i < 10; i++) {
     6             Participant p = new Participant(conference, "Participant " + i);
     7             Thread t = new Thread(p);
     8             t.start();
     9         }
    10     }
     
    需要注意的地方
    CountDownLatch比较容易记忆的是他的功能,是一个线程计数器。等计数器为0时那些先前因调用await()方法休眠的线程被唤醒。
    CountDownLatch能够控制的线程是哪些?是那些调用了CountDownLatch的await()方法的线程
    具体使用方式,容易忘记:先运行await()方法的线程,例子中是视频会议的线程。然后是执行与会者 线程,这里的处理是每到一位(每创建一个线程并运行run()方法时就使计数器减1)就让计数器减1,等计数器减为0时唤醒因调用await()方法进入休眠的线程。这里的这些与会者就是要等待的线程。
     
    应用场景
    等人到齐了才能开始开会;

    CyclicBarrier

    含义
    栅栏允许两个或者多个线程在某个集合点同步。当一个线程到达集合点时,它将调用await()方法等待其它的线程。线程调用await()方法后,CyclicBarrier将阻塞这个线程并将它置入休眠状态等待其它线程的到来。等最后一个线程调用await()方法时,CyclicBarrier将唤醒所有等待的线程然后这些线程将继续执行CyclicBarrier可以传入另一个Runnable对象作为初始化参数。当所有的线程都到达集合点后,CyclicBarrier类将Runnable对象作为线程执行。
     
    方法
    await():使线程置入休眠直到最后一个线程的到来之后唤醒所有休眠的线程
     
    例子
    在矩阵(二维数组)中查找一个指定的数字。矩阵将被分为多个子集,每个子集交给一个线程去查找。当所有线程查找完毕后交给最后的线程汇总结果。
    查找类:在一个子集中查找指定数字,找到之后把结果存储后调用await()方法置入休眠等待最后一个线程的到来唤醒
     1 public class Searcher implements Runnable {
     2     private final CyclicBarrier barrier;
     3     @Override
     4     public void run() {
     5         int counter;
     6         System.out.printf("%s: Processing lines from %d to %d.
    ",Thread.currentThread().getName(),firstRow,lastRow);
     7         for (int i=firstRow; i<lastRow; i++){
     8             int row[]=mock.getRow(i);
     9             counter=0;
    10             for (int j=0; j<row.length; j++){
    11                 if (row[j]==number){
    12                     counter++;
    13                 }
    14             }
    15             results.setData(i, counter);
    16         }
    17         System.out.printf("%s: Lines processed.
    ",Thread.currentThread().getName());        
    18         try {
    19             barrier.await();
    20         } catch (InterruptedException e) {
    21             e.printStackTrace();
    22         } catch (BrokenBarrierException e) {
    23             e.printStackTrace();
    24         }
    25     }
    26 }
    汇总类:汇总每个Searcher找到的结果
     1 public class Grouper implements Runnable {
     2     private Results results;
     3     
     4     public Grouper(Results results){
     5         this.results=results;
     6     }
     7     @Override
     8     public void run() {
     9         int finalResult=0;
    10         System.out.printf("Grouper: Processing results...
    ");
    11         int data[]=results.getData();
    12         for (int number:data){
    13             finalResult+=number;
    14         }
    15         System.out.printf("Grouper: Total result: %d.
    ",finalResult);
    16     }
    17 }
    主函数,如何把Searcher和Grouper类配合起来呢??
     1 public static void main(String[] args) {
     2         final int ROWS=10000;
     3         final int NUMBERS=1000;
     4         final int SEARCH=5; 
     5         final int PARTICIPANTS=5;
     6         final int LINES_PARTICIPANT=2000;
     7         MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);//矩阵的声明
     8         
     9         Results results=new Results(ROWS);//结果集
    10         
    11         Grouper grouper=new Grouper(results);//汇总线程
    12         
    13         CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);//栅栏,传入参数含义:线程同步个数,汇总线程
    14         
    15         Searcher searchers[]=new Searcher[PARTICIPANTS];
    16         for (int i=0; i<PARTICIPANTS; i++){
    17             searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier);
    18             Thread thread=new Thread(searchers[i]);
    19             thread.start();
    20         }
    21         System.out.printf("Main: The main thread has finished.
    ");
    22     }
    运行结果:
    Mock: There are 999286 ocurrences of number in generated data.
    Thread-0: Processing lines from 0 to 2000.
    Main: The main thread has finished.
    Thread-0: Lines processed.
    Thread-1: Processing lines from 2000 to 4000.
    Thread-1: Lines processed.
    Thread-3: Processing lines from 6000 to 8000.
    Thread-3: Lines processed.
    Thread-2: Processing lines from 4000 to 6000.
    Thread-2: Lines processed.
    Thread-4: Processing lines from 8000 to 10000.
    Thread-4: Lines processed.
    Grouper: Processing results...
    Grouper: Total result: 999286.
     
    需要注意的地方
    线程完成任务后调用CyclicBarrier的await()方法休眠等待。在所有线程在集合点均到达时,栅栏调用传入的Runnable对象进行最后的执行。
    与CountDownLatch的区别:
    • 在所有线程到达集合点后接受一个Runnable类型的对象作为后续的执行
    • 没有显示调用CountDown()方法
    • CountDownLatch一般只能使用一次,CyclicBarrier可以多次使用
    应用场景
    多个线程做任务,等到达集合点同步后交给后面的线程做汇总

    Phaser

    含义
    更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
    跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。
     
    函数
    arriveAndAwaitAdvance():类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行
    arriveAndDeregister():把执行到此的线程从Phaser中注销掉
    isTerminated():判断Phaser是否终止
    register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
    forceTermination():强制Phaser进入终止态
    ... ...
     
    例子
    使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
    1、在执行的文件夹及其子文件夹中获取扩展名为.log的文件
    2、对每一步的结果进行过滤,删除修改时间超过24小时的文件
    3、将结果打印到控制台
    在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)
    文件查找类
      1 public class FileSearch implements Runnable {
      2     private String initPath;
      3 
      4     private String end;
      5     
      6     private List<String> results;
      7 
      8     private Phaser phaser;
      9 
     10     public FileSearch(String initPath, String end, Phaser phaser) {
     11         this.initPath = initPath;
     12         this.end = end;
     13         this.phaser=phaser;
     14         results=new ArrayList<>();
     15     }
     16     @Override
     17     public void run() {
     18 
     19         phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了
     20         
     21         System.out.printf("%s: Starting.
    ",Thread.currentThread().getName());
     22         
     23         // 1st Phase: 查找文件
     24         File file = new File(initPath);
     25         if (file.isDirectory()) {
     26             directoryProcess(file);
     27         }
     28         
     29         // 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
     30         if (!checkResults()){
     31             return;
     32         }
     33         
     34         // 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
     35         filterResults();
     36         
     37         // 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
     38         if (!checkResults()){
     39             return;
     40         }
     41         
     42         // 3rd Phase: 显示结果
     43         showInfo();
     44         phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
     45         System.out.printf("%s: Work completed.
    ",Thread.currentThread().getName());
     46     }
     47     private void showInfo() {
     48         for (int i=0; i<results.size(); i++){
     49             File file=new File(results.get(i));
     50             System.out.printf("%s: %s
    ",Thread.currentThread().getName(),file.getAbsolutePath());
     51         }
     52         // Waits for the end of all the FileSearch threads that are registered in the phaser
     53         phaser.arriveAndAwaitAdvance();
     54     }
     55     private boolean checkResults() {
     56         if (results.isEmpty()) {
     57             System.out.printf("%s: Phase %d: 0 results.
    ",Thread.currentThread().getName(),phaser.getPhase());
     58             System.out.printf("%s: Phase %d: End.
    ",Thread.currentThread().getName(),phaser.getPhase());
     59             //结果为空,Phaser完成并把该线程从Phaser中移除掉
     60             phaser.arriveAndDeregister();
     61             return false;
     62         } else {
     63             // 等待所有线程查找完成
     64             System.out.printf("%s: Phase %d: %d results.
    ",Thread.currentThread().getName(),phaser.getPhase(),results.size());
     65             phaser.arriveAndAwaitAdvance();
     66             return true;
     67         }        
     68     }
     69     private void filterResults() {
     70         List<String> newResults=new ArrayList<>();
     71         long actualDate=new Date().getTime();
     72         for (int i=0; i<results.size(); i++){
     73             File file=new File(results.get(i));
     74             long fileDate=file.lastModified();
     75             
     76             if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
     77                 newResults.add(results.get(i));
     78             }
     79         }
     80         results=newResults;
     81     }
     82     private void directoryProcess(File file) {
     83         // Get the content of the directory
     84         File list[] = file.listFiles();
     85         if (list != null) {
     86             for (int i = 0; i < list.length; i++) {
     87                 if (list[i].isDirectory()) {
     88                     // If is a directory, process it
     89                     directoryProcess(list[i]);
     90                 } else {
     91                     // If is a file, process it
     92                     fileProcess(list[i]);
     93                 }
     94             }
     95         }
     96     }
     97     private void fileProcess(File file) {
     98         if (file.getName().endsWith(end)) {
     99             results.add(file.getAbsolutePath());
    100         }
    101     }
    102 }
    主函数:
     1 public static void main(String[] args) {
     2         Phaser phaser = new Phaser(3);
     3 
     4         FileSearch system = new FileSearch("C:\Windows", "log", phaser);
     5         FileSearch apps = new FileSearch("C:\Program Files", "log", phaser);
     6         FileSearch documents = new FileSearch("C:\Documents And Settings", "log", phaser);
     7 
     8         Thread systemThread = new Thread(system, "System");
     9         systemThread.start();
    10         Thread appsThread = new Thread(apps, "Apps");
    11         appsThread.start();        
    12         Thread documentsThread = new Thread(documents, "Documents");
    13         documentsThread.start();
    14         try {
    15             systemThread.join();
    16             appsThread.join();
    17             documentsThread.join();
    18         } catch (InterruptedException e) {
    19             e.printStackTrace();
    20         }
    21         System.out.printf("Terminated: %s
    ", phaser.isTerminated());
    22     }
     
    注意的地方
    例子中Phaser分了三个步骤:查找文件、过滤文件、打印结果。并且在查找文件和过滤文件结束后对结果进行分析,如果是空的,将此线程从Phaser中注销掉。也就是说,下一阶段,该线程将不参与运行。
    在run()方法中,开头调用了phaser的arriveAndAwaitAdvance()方法来保证所有线程都启动了之后再开始查找文件。在查找文件和过滤文件阶段结束之后,都对结果进行了处理。即:如果结果是空的,那么就把该条线程移除,如果不空,那么等待该阶段所有线程都执行完该步骤之后在统一执行下一步。最后,任务执行完后,把Phaser中的线程均注销掉。
    Phaser其实有两个状态:活跃态和终止态。当存在参与同步的线程时,Phaser就是活跃的。并且在每个阶段结束的时候同步。当所有参与同步的线程都取消注册的时候,Phase就处于终止状态。在这种状态下,Phaser没有任务参与者。
    Phaser主要功能就是执行多阶段任务,并保证每个阶段点的线程同步。在每个阶段点还可以条件或者移除参与者。主要涉及方法arriveAndAwaitAdvance()和register()和arriveAndDeregister()
     
    使用场景
    多阶段任务
  • 相关阅读:
    家庭记账本APP开发准备(二)
    使用花生壳5做内网穿透
    课堂练习之可视化的强化版
    第五周总结
    课堂练习之疫情可视化
    第四周总结
    第三周总结
    第二周总结
    课堂练习之最大子数组
    软工第二学期开课博客
  • 原文地址:https://www.cnblogs.com/uodut/p/6830939.html
Copyright © 2011-2022 走看看