zoukankan      html  css  js  c++  java
  • JAVA CyclicBarrier类详解

     一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环 的barrier。

    CyclicBarrier 支持一个可选的 Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

    示例用法:下面是一个在并行分解设计中使用 barrier 的例子:

    class Solver {
        final int N;
        final float[][] data;
        final CyclicBarrier barrier;
    
        class Worker implements Runnable {
            int myRow;
            Worker(int row) {
                myRow = row;
            }
            public void run() {
                while (!done()) {
                    processRow(myRow);
                    try {
                        barrier.await();
                    } catch (InterruptedException ex) {
                        return;
                    } catch (BrokenBarrierException ex) {
                        return;
                    }
                }
            }
        }
        public Solver(float[][] matrix) {
            data = matrix;
            N = matrix.length;
            barrier = new CyclicBarrier(N, new Runnable() {
                public void run() {
                    //mergeRows(...);合并结果
                }
            });
            for (int i = 0; i < N; ++i)
                new Thread(new Worker(i)).start();
             waitUntilDone();
        }
    }

    在这个例子中,每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的Runnable屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么 done() 将返回 true,所有的 worker 线程都将终止。

    如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的索引。然后,您可以选择哪个线程应该执行屏障操作,例如:

     if (barrier.await() == 0) {
         // log the completion of this iteration
      }

    对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

    内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。

    实现一个矩阵,在矩阵中查找需要查找数字的出现次数。

    public class MatrixMock {
        private int data[][];
        public MatrixMock(int size,int length,int number){
            int counter=0;
            data=new int[size][length];
            Random random=new Random();
            for (int i = 0;i <size;i++) {
                for (int j=0; j<length; j++) {
                    data[i][j]=random.nextInt(10);
                    if(data[i][j]==number){
                        counter++;
                    }
                }
            }
            System.out.println("Mock:There are:"+counter+" number in generated data");
        }
        public int[] getRow(int row){
            if(row>=0&&row<data.length){
                return data[row];
            }
            return null;
        }
    }
    //存放矩阵每行的查找结果
    public class Result {
        private int data[];
        public Result(int size) {
            data=new int[size];
        }
        public void setData(int postion,int value){
            data[postion]=value;
        }
        public int[] getData(){
            return data;
        }
    }
    //查找线程
    public class Searcher implements Runnable {
        private int fristRow;//起始行
        private int lastRow;//终止行
        private MatrixMock matrixMock;//要查找的矩阵
        private Result results;//保存查找结果
        private int number;//需要查找到数字
        private final CyclicBarrier barrier;
        public Searcher(int fristRow, int lastRow, MatrixMock matrixMock,
                Result results, int number, CyclicBarrier barrier) {
            this.fristRow = fristRow;
            this.lastRow = lastRow;
            this.matrixMock = matrixMock;
            this.results = results;
            this.number = number;
            this.barrier = barrier;
        }
        @Override
        public void run() {
            int counter;
            System.out.println(Thread.currentThread().getName()
                    + ": Processing lines from " + fristRow + " to " + lastRow);
            for (int i=fristRow; i<lastRow;i++) {
                int row[]=matrixMock.getRow(i);
                counter=0;
                for (int j = 0; j <row.length; j++) {
                    if(row[j]==number){
                        counter++;
                    }
                }
                results.setData(i, counter);
            }
            System.out.println(Thread.currentThread().getName()+":Lines processed");
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }
    //合并查找结果
    public class Grouper implements Runnable {
        private Result result;
        public Grouper(Result result) {
            this.result = result;
        }
        @Override
        public void run() {
            int finalResult=0;
            System.out.println("Grouper: Processing results...");
            int data[]=result.getData();
            for (int i : data) {
                finalResult+=i;
            }
            System.out.println("Grouper: Total result:"+finalResult);
        }
    }
    public class GrouperMain {
        public static void main(String[] args) {
            final int ROWS = 10000;
            final int NUMBRES = 1000;
            final int SEARCH = 5;
            final int PARTICIPANTS = 5;
            final int LINES_PARTICIPANT = 2000;
            MatrixMock mock = new MatrixMock(ROWS, NUMBRES, SEARCH);
            Result result = new Result(ROWS);
            Grouper grouper = new Grouper(result);
            CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
            Searcher searchers[] = new Searcher[PARTICIPANTS];
            for (int i = 0; i < PARTICIPANTS; i++) {
                searchers[i] = new Searcher(i * LINES_PARTICIPANT, i
                        * LINES_PARTICIPANT + LINES_PARTICIPANT, mock, result, 5,
                        barrier);
                Thread thread = new Thread(searchers[i]);
                thread.start();
            }
            System.out.println("Main: The Main Thread has finnished");
        }
    }

    运行结果:

    Mock:There are:1000810 number in generated data
    Thread-0: Processing lines from 0 to 2000
    Thread-1: Processing lines from 2000 to 4000
    Thread-2: Processing lines from 4000 to 6000
    Thread-4: Processing lines from 8000 to 10000
    Main: The Main Thread has finnished
    Thread-3: Processing lines from 6000 to 8000
    Thread-3:Lines processed
    Thread-1:Lines processed
    Thread-2:Lines processed
    Thread-4:Lines processed
    Thread-0:Lines processed
    Grouper: Processing results...
    Grouper: Total result:1000810

    应用场景:在某种需求中,比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择CyclicBarrier了

  • 相关阅读:
    common.js
    数据库_day05_查询语句及JDBC
    数据库_day04_基本的增删改查操作
    Java 抽象类、普通类、接口的区别
    java中ArrayList和LinkedList的区别
    数据库_day03_对数据库的基本操作
    java_day18_集合框架map和list
    java_day17_socket,tcp协议传输
    java_day16_读写锁,fork-join框架
    java_day15_线程,匿名内部类,线程安全
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5424913.html
Copyright © 2011-2022 走看看