zoukankan      html  css  js  c++  java
  • java中等待所有线程都执行结束

    转自:http://blog.csdn.net/liweisnake/article/details/12966761 

    今天看到一篇文章,是关于java中如何等待所有线程都执行结束,文章总结得很好,原文如下http://software.intel.com/zh-cn/blogs/2013/10/15/java-countdownlatchcyclicbarrier/?utm_campaign=CSDN&utm_source=intel.csdn.net&utm_medium=Link&utm_content=others-%20Java

      看过之后在想java中有很大的灵活性,应该有更多的方式可以做这件事。

      这个事情的场景是这样的:许多线程并行的计算一堆问题,然后每个计算存在一个队列,在主线程要等待所有计算结果完成后排序并展示出来。这样的问题其实很常见。

      1. 使用join。这种方式其实并不是那么的优雅,将所有线程启动完之后还需要将所有线程都join,但是每次join都会阻塞,直到被join线程完成,很可能所有被阻塞线程已经完事了,主线程还在不断地join,貌似有点浪费,而且两个循环也不太好看。

    1. 复制代码
       1 public void testThreadSync1() {  
       2   
       3     final Vector<Integer> list = new Vector<Integer>();  
       4     Thread[] threads = new Thread[TEST_THREAD_COUNT];  
       5     try {  
       6         for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
       7             final int num = i;  
       8             threads[i] = new Thread(new Runnable() {  
       9                 public void run() {  
      10                     try {  
      11                         Thread.sleep(random.nextInt(100));  
      12                     } catch (InterruptedException e) {  
      13                         e.printStackTrace();  
      14                     }  
      15                     list.add(num);  
      16                     System.out.print(num + " add.	");  
      17                 }  
      18             });  
      19             threads[i].start();  
      20         }  
      21         for (int i = 0; i < threads.length; i++) {  
      22             threads[i].join();  
      23             System.out.print(i + " end.	");  
      24         }  
      25     } catch (InterruptedException ie) {  
      26         ie.printStackTrace();  
      27     }  
      28     printSortedResult(list);  
      29 }  
      复制代码
    1. 1 9 add.  7 add.  3 add.  5 add.  4 add.  1 add.  0 add.  0 end.  1 end.  8 add.  2 add.  2 end.  3 end.  4 end.  5 end.  6 add.  6 end.  7 end.  8 end.  9 end.    
      2 before sort  
      3 9   7   3   5   4   1   0   8   2   6     
      4 after sort  
      5 0   1   2   3   4   5   6   7   8   9  

      2. 使用wait/notifyAll,这个方式其实跟上面是类似的,只是比较底层些吧(join实际上也是wait)。

     
    1. 复制代码
       1 @Test  
       2 public void testThreadSync2() throws IOException, InterruptedException {  
       3     final Object waitObject = new Object();  
       4     final AtomicInteger count = new AtomicInteger(TEST_THREAD_COUNT);  
       5     final Vector<Integer> list = new Vector<Integer>();  
       6     Thread[] threads = new Thread[TEST_THREAD_COUNT];  
       7     for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
       8         final int num = i;  
       9         threads[i] = new Thread(new Runnable() {  
      10             public void run() {  
      11                 try {  
      12                     Thread.sleep(random.nextInt(100));  
      13                 } catch (InterruptedException e) {  
      14                     e.printStackTrace();  
      15                 }  
      16                 list.add(num);  
      17                 System.out.print(num + " add.	");  
      18                 synchronized (waitObject) {  
      19                     int cnt = count.decrementAndGet();  
      20                     if (cnt == 0) {  
      21                         waitObject.notifyAll();  
      22                     }  
      23                 }  
      24             }  
      25         });  
      26         threads[i].start();  
      27     }  
      28     synchronized (waitObject) {  
      29         while (count.get() != 0) {  
      30             waitObject.wait();  
      31         }  
      32     }  
      33     printSortedResult(list);  
      34 }  
      复制代码

      3. 使用CountDownLatch,这其实是最优雅的写法了,每个线程完成后都去将计数器减一,最后完成时再来唤醒。

    例1

     
    1. 复制代码
       1 @Test  
       2 public void testThreadSync3() {  
       3     final Vector<Integer> list = new Vector<Integer>();  
       4     Thread[] threads = new Thread[TEST_THREAD_COUNT];  
       5     final CountDownLatch latch = new CountDownLatch(TEST_THREAD_COUNT);  
       6     for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
       7         final int num = i;  
       8         threads[i] = new Thread(new Runnable() {  
       9             public void run() {  
      10                 try {  
      11                     Thread.sleep(random.nextInt(100));  
      12                 } catch (InterruptedException e) {  
      13                     e.printStackTrace();  
      14                 }  
      15                 list.add(num);  
      16                 System.out.print(num + " add.	");  
      17                 latch.countDown();  
      18             }  
      19         });  
      20         threads[i].start();  
      21     }  
      22     try {  
      23         latch.await();  
      24     } catch (InterruptedException e) {  
      25         e.printStackTrace();  
      26     }  
      27     printSortedResult(list);  
      28 }  
      复制代码

    例2

    CountDownLatch 初始化设置count,即等待(await)count个线程或一个线程count次计数,通过工作线程来countDown计数减一,直到计数为0,await阻塞结束。

    设置的count不可更改,如需要动态设置计数的线程数,可以使用CyclicBarrier.

    下面的例子,所有的工作线程中准备就绪以后,并不是直接运行,而是等待主线程的信号后再执行具体的操作。

    package com.example.multithread;  
      
    import java.util.concurrent.CountDownLatch;  
      
    class Driver  
    {  
        private static final int TOTAL_THREADS = 10;  
        private final CountDownLatch mStartSignal = new CountDownLatch(1);  
        private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
      
        void main()  
        {  
            for (int i = 0; i < TOTAL_THREADS; i++)  
            {  
                new Thread(new Worker(mStartSignal, mDoneSignal, i)).start();  
            }  
            System.out.println("Main Thread Now:" + System.currentTimeMillis());  
            doPrepareWork();// 准备工作   
            mStartSignal.countDown();// 计数减一为0,工作线程真正启动具体操作   
            doSomethingElse();//做点自己的事情   
            try  
            {  
                mDoneSignal.await();// 等待所有工作线程结束   
            }  
            catch (InterruptedException e)  
            {  
                // TODO Auto-generated catch block   
                e.printStackTrace();  
            }  
            System.out.println("All workers have finished now.");  
            System.out.println("Main Thread Now:" + System.currentTimeMillis());  
        }  
      
        void doPrepareWork()  
        {  
            System.out.println("Ready,GO!");  
        }  
      
        void doSomethingElse()  
        {  
            for (int i = 0; i < 100000; i++)  
            {  
                ;// delay   
            }  
            System.out.println("Main Thread Do something else.");  
        }  
    }  
      
    class Worker implements Runnable  
    {  
        private final CountDownLatch mStartSignal;  
        private final CountDownLatch mDoneSignal;  
        private final int mThreadIndex;  
      
        Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal,  
                final int threadIndex)  
        {  
            this.mDoneSignal = doneSignal;  
            this.mStartSignal = startSignal;  
            this.mThreadIndex = threadIndex;  
        }  
      
        @Override  
        public void run()  
        {  
            // TODO Auto-generated method stub   
            try  
            {  
                mStartSignal.await();// 阻塞,等待mStartSignal计数为0运行后面的代码   
                                        // 所有的工作线程都在等待同一个启动的命令   
                doWork();// 具体操作   
                System.out.println("Thread " + mThreadIndex + " Done Now:"  
                        + System.currentTimeMillis());  
                mDoneSignal.countDown();// 完成以后计数减一   
            }  
            catch (InterruptedException e)  
            {  
                // TODO Auto-generated catch block   
                e.printStackTrace();  
            }  
        }  
      
        public void doWork()  
        {  
            for (int i = 0; i < 1000000; i++)  
            {  
                ;// 耗时操作   
            }  
            System.out.println("Thread " + mThreadIndex + ":do work");  
        }  
    }  
      
    public class CountDownLatchTest  
    {  
        public static void main(String[] args)  
        {  
            // TODO Auto-generated method stub   
            new Driver().main();  
        }  
      
    }
    

      

    1. 通过Executor启动线程:

      class CountDownLatchDriver2  
      {  
          private static final int TOTAL_THREADS = 10;  
          private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
       
      
        
          void main()  
          {  
              System.out.println("Main Thread Now:" + System.currentTimeMillis());  
              doPrepareWork();// 准备工作   
        
              Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);  
              for (int i = 0; i < TOTAL_THREADS; i++)  
              {  
                  // 通过内建的线程池维护创建的线程   
                  executor.execute(new RunnableWorker(mDoneSignal, i));  
              }  
              doSomethingElse();// 做点自己的事情   
              try  
              {  
                  mDoneSignal.await();// 等待所有工作线程结束   
              }  
              catch (InterruptedException e)  
              {  
                  // TODO Auto-generated catch block   
                  e.printStackTrace();  
              }  
              System.out.println("All workers have finished now.");  
              System.out.println("Main Thread Now:" + System.currentTimeMillis());  
          }  
        
          void doPrepareWork()  
          {  
              System.out.println("Ready,GO!");  
          }  
        
          void doSomethingElse()  
          {  
              for (int i = 0; i < 100000; i++)  
              {  
                  ;// delay   
              }  
              System.out.println("Main Thread Do something else.");  
          }  
      }  
        
      class RunnableWorker implements Runnable  
      {  
        
          private final CountDownLatch mDoneSignal;  
          private final int mThreadIndex;  
        
          RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)  
          {  
              this.mDoneSignal = doneSignal;  
              this.mThreadIndex = threadIndex;  
          }  
        
          @Override  
          public void run()  
          {  
              // TODO Auto-generated method stub   
        
              doWork();// 具体操作   
              System.out.println("Thread " + mThreadIndex + " Done Now:"  
                      + System.currentTimeMillis());  
              mDoneSignal.countDown();// 完成以后计数减一   
                                      // 计数为0时,主线程接触阻塞,继续执行其他任务   
              try  
              {  
                  // 可以继续做点其他的事情,与主线程无关了   
                  Thread.sleep(5000);  
                  System.out.println("Thread " + mThreadIndex  
                          + " Do something else after notifing main thread");  
        
              }  
              catch (InterruptedException e)  
              {  
                  // TODO Auto-generated catch block   
                  e.printStackTrace();  
              }  
        
          }  
        
          public void doWork()  
          {  
              for (int i = 0; i < 1000000; i++)  
              {  
                  ;// 耗时操作   
              }  
              System.out.println("Thread " + mThreadIndex + ":do work");  
          }  
      }
      

        

      class CountDownLatchDriver2  
      {  
          private static final int TOTAL_THREADS = 10;  
          private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
       
      
        
          void main()  
          {  
              System.out.println("Main Thread Now:" + System.currentTimeMillis());  
              doPrepareWork();// 准备工作   
        
              Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);  
              for (int i = 0; i < TOTAL_THREADS; i++)  
              {  
                  // 通过内建的线程池维护创建的线程   
                  executor.execute(new RunnableWorker(mDoneSignal, i));  
              }  
              doSomethingElse();// 做点自己的事情   
              try  
              {  
                  mDoneSignal.await();// 等待所有工作线程结束   
              }  
              catch (InterruptedException e)  
              {  
                  // TODO Auto-generated catch block   
                  e.printStackTrace();  
              }  
              System.out.println("All workers have finished now.");  
              System.out.println("Main Thread Now:" + System.currentTimeMillis());  
          }  
        
          void doPrepareWork()  
          {  
              System.out.println("Ready,GO!");  
          }  
        
          void doSomethingElse()  
          {  
              for (int i = 0; i < 100000; i++)  
              {  
                  ;// delay   
              }  
              System.out.println("Main Thread Do something else.");  
          }  
      }  
        
      class RunnableWorker implements Runnable  
      {  
        
          private final CountDownLatch mDoneSignal;  
          private final int mThreadIndex;  
        
          RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)  
          {  
              this.mDoneSignal = doneSignal;  
              this.mThreadIndex = threadIndex;  
          }  
        
          @Override  
          public void run()  
          {  
              // TODO Auto-generated method stub   
        
              doWork();// 具体操作   
              System.out.println("Thread " + mThreadIndex + " Done Now:"  
                      + System.currentTimeMillis());  
              mDoneSignal.countDown();// 完成以后计数减一   
                                      // 计数为0时,主线程接触阻塞,继续执行其他任务   
              try  
              {  
                  // 可以继续做点其他的事情,与主线程无关了   
                  Thread.sleep(5000);  
                  System.out.println("Thread " + mThreadIndex  
                          + " Do something else after notifing main thread");  
        
              }  
              catch (InterruptedException e)  
              {  
                  // TODO Auto-generated catch block   
                  e.printStackTrace();  
              }  
        
          }  
        
          public void doWork()  
          {  
              for (int i = 0; i < 1000000; i++)  
              {  
                  ;// 耗时操作   
              }  
              System.out.println("Thread " + mThreadIndex + ":do work");  
          }  
      }
      

        

      1. 输出:

        Main Thread Now:1359959480786
        Ready,GO!
        Thread 0:do work
        Thread 0 Done Now:1359959480808
        Thread 1:do work
        Thread 1 Done Now:1359959480811
        Thread 2:do work
        Thread 2 Done Now:1359959480813
        Main Thread Do something else.
        Thread 3:do work
        Thread 3 Done Now:1359959480825
        Thread 5:do work
        Thread 5 Done Now:1359959480827
        Thread 7:do work
        Thread 7 Done Now:1359959480829
        Thread 9:do work
        Thread 9 Done Now:1359959480831
        Thread 4:do work
        Thread 4 Done Now:1359959480833
        Thread 6:do work
        Thread 6 Done Now:1359959480835
        Thread 8:do work
        Thread 8 Done Now:1359959480837
        All workers have finished now.
        Main Thread Now:1359959480838
        Thread 0 Do something else after notifing main thread
        Thread 1 Do something else after notifing main thread
        Thread 2 Do something else after notifing main thread
        Thread 3 Do something else after notifing main thread
        Thread 9 Do something else after notifing main thread
        Thread 7 Do something else after notifing main thread
        Thread 5 Do something else after notifing main thread
        Thread 4 Do something else after notifing main thread
        Thread 6 Do something else after notifing main thread
        Thread 8 Do something else after notifing main thread
        

          

      4. 使用CyclicBarrier。这里其实类似上面,这个berrier只是在等待完成后自动调用传入CyclicBarrier的Runnable。

    例1

     
    @Test  
    public void testThreadSync4() throws IOException {  
        final Vector<Integer> list = new Vector<Integer>();  
        Thread[] threads = new Thread[TEST_THREAD_COUNT];  
        final CyclicBarrier barrier = new CyclicBarrier(TEST_THREAD_COUNT,  
                new Runnable() {  
                    public void run() {  
                        printSortedResult(list);  
                    }  
                });  
        for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
            final int num = i;  
            threads[i] = new Thread(new Runnable() {  
                public void run() {  
                    try {  
                        Thread.sleep(random.nextInt(100));  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    list.add(num);  
                    System.out.print(num + " add.	");  
                    try {  
                        barrier.await();  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    } catch (BrokenBarrierException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
            threads[i].start();  
        }  
        System.in.read();  
    }
    

      

    1. 例2

      1. 复制代码
         1 class WalkTarget  
         2 {  
         3     private final int mCount = 5;  
         4     private final CyclicBarrier mBarrier;  
         5     ExecutorService mExecutor;  
         6   
         7     class BarrierAction implements Runnable  
         8     {  
         9         @Override  
        10         public void run()  
        11         {  
        12             // TODO Auto-generated method stub   
        13             System.out.println("所有线程都已经完成任务,计数达到预设值");  
        14             //mBarrier.reset();//恢复到初始化状态          
        15               
        16         }  
        17     }  
        18   
        19     WalkTarget()  
        20     {  
        21         //初始化CyclicBarrier   
        22         mBarrier = new CyclicBarrier(mCount, new BarrierAction());  
        23         mExecutor = Executors.newFixedThreadPool(mCount);  
        24   
        25         for (int i = 0; i < mCount; i++)  
        26         {  
        27             //启动工作线程   
        28             mExecutor.execute(new Walker(mBarrier, i));  
        29         }  
        30     }  
        31 }  
        32   
        33 //工作线程   
        34 class Walker implements Runnable  
        35 {  
        36     private final CyclicBarrier mBarrier;  
        37     private final int mThreadIndex;  
        38   
        39     Walker(final CyclicBarrier barrier, final int threadIndex)  
        40  
        41 
        42     {  
        43         mBarrier = barrier;  
        44         mThreadIndex = threadIndex;  
        45     }  
        46   
        47     @Override  
        48     public void run()  
        49     {  
        50         // TODO Auto-generated method stub   
        51         System.out.println("Thread " + mThreadIndex + " is running...");  
        52         // 执行任务   
        53         try  
        54         {  
        55             TimeUnit.MILLISECONDS.sleep(5000);  
        56             // do task   
        57         }  
        58         catch (InterruptedException e)  
        59         {  
        60             // TODO Auto-generated catch block   
        61             e.printStackTrace();  
        62         }  
        63   
        64         // 完成任务以后,等待其他线程完成任务   
        65         try  
        66         {  
        67             mBarrier.await();  
        68         }  
        69         catch (InterruptedException e)  
        70         {  
        71             // TODO Auto-generated catch block   
        72             e.printStackTrace();  
        73         }  
        74         catch (BrokenBarrierException e)  
        75         {  
        76             // TODO Auto-generated catch block   
        77             e.printStackTrace();  
        78         }  
        79         // 其他线程任务都完成以后,阻塞解除,可以继续接下来的任务   
        80         System.out.println("Thread " + mThreadIndex + " do something else");  
        81     }  
        82   
        83 }  
        84   
        85 public class CountDownLatchTest  
        86 {  
        87     public static void main(String[] args)  
        88     {  
        89         // TODO Auto-generated method stub   
        90         //new CountDownLatchDriver2().main();   
        91         new WalkTarget();  
        92     }  
        93   
        94 }  
        复制代码

        输出(注意,只有所有的线程barrier.await之后才能继续执行其他的操作):

        Thread 0 is running... Thread 2 is running... Thread 3 is running... Thread 1 is running... Thread 4 is running... 所有线程都已经完成任务,计数达到预设值 Thread 4 do something else Thread 0 do something else Thread 2 do something else Thread 3 do something else Thread 1 do something else

    5、

    CountDownLatch和CyclicBarrier简单比较:

     

    CountDownLatch

    CyclicBarrier

    软件包

    java.util.concurrent

    java.util.concurrent

    适用情景

    主线程等待多个工作线程结束

    多个线程之间互相等待,直到所有线程达到一个障碍点(Barrier point)

    主要方法

    CountDownLatch(int count) (主线程调用)

    初始化计数

    CountDownLatch.await (主线程调用)

    阻塞,直到等待计数为0解除阻塞

    CountDownLatch.countDown

    计数减一(工作线程调用)

    CyclicBarrier(int parties, Runnable barrierAction) //初始化参与者数量和障碍点执行Action,Action可选。由主线程初始化

    CyclicBarrier.await() //由参与者调用

    阻塞,直到所有线程达到屏障点

    等待结束

    各线程之间不再互相影响,可以继续做自己的事情。不再执行下一个目标工作。

    在屏障点达到后,允许所有线程继续执行,达到下一个目标。可以重复使用CyclicBarrier

    异常

    如果其中一个线程由于中断,错误,或超时导致永久离开屏障点,其他线程也将抛出异常。

    其他

    如果BarrierAction不依赖于任何Party中的所有线程,那么在任何party中的一个线程被释放的时候,可以直接运行这个Action。

    If(barrier.await()==2)

    {

    //do action

    }

     
  • 相关阅读:
    vue-quill-editor富文本编辑器使用
    vue中this.$router.push()路由传值和获取的两种常见方法
    在 Vue.js项目中如何定义全局变量&全局函数
    vue中利用provide和inject实现页面刷新(无白屏)重载组件
    linux 发送get post命令
    java 反射结合hibernate-validator 注解校验对象数据合法性
    开发过程工具集
    java 创建文件夹及文件写入数据到excel
    java文件不存在就创建_Java中创建并写文件的5种方式【转载】
    java 字符串左补齐
  • 原文地址:https://www.cnblogs.com/itzyz/p/11077993.html
Copyright © 2011-2022 走看看