zoukankan      html  css  js  c++  java
  • 并发(三)

    本篇讲述新类库中的工具类。

    参考博客资料:

    http://developer.51cto.com/art/201403/432095.htm

    http://blog.csdn.net/flyingdon/article/details/5110582

    http://blog.csdn.net/shihuacai/article/details/8856526

    《JAVA思想编程》

    一、CountDownLatch
     CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作。例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工作都完成,主线程才能继续它的工作。这些准备工作彼此独立,所以可以并发执行以提高速度。在这个场景下就可以使用 CountDownLatch 协调线程之间的调度了。在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join()。在 JUC 出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了。

    例子:

    a,b,c,d四个盘,分别统计出大小,最后要求得四个盘总的大小的值。

    首先想到的是CountDownLatch这个类,在所有单个盘的大小统计完成之后,再进行总和的计算。之后,我们需要一个DiskMemory类,抽象盘的大小的统计。有个size和totalSize,大概就这些。启动线程时,启动4次,每次分别计算单个的size,最后汇总。

    代码如下:

     1 import java.util.Random;
     2 
     3 public class DiskMemory {
     4 
     5     private static int totalSize;
     6     
     7     public int getSize() {
     8         return new Random().nextInt(5) + 1;
     9     }
    10     
    11     public void setSize(int size) {
    12         // synchronized method is must
    13         synchronized (this) {
    14             System.out.println("-------first totalSize: " + totalSize);
    15             totalSize = totalSize + size;
    16             System.out.println("-------end totalSize: " + totalSize);
    17         }
    18         
    19     }
    20     
    21     public int getTotalSize() {
    22         return totalSize;
    23     }
    24 }
    View Code
     1 import java.util.concurrent.CountDownLatch;
     2 import java.util.concurrent.ExecutorService;
     3 import java.util.concurrent.Executors;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 public class CountSizeDemo {
     7 
     8     public static void main(String[] args) {
     9 
    10         CountDownLatch countDownLatch = new CountDownLatch(4);
    11         DiskMemory diskMemory = new DiskMemory();
    12         ExecutorService executorService = Executors.newCachedThreadPool();
    13         for (int i = 0; i < 4; i++) {
    14 
    15             executorService.execute(() -> {
    16                 try {
    17                     TimeUnit.MICROSECONDS.sleep(1000);
    18                 } catch (InterruptedException ex) {
    19                     ex.printStackTrace();
    20                 }
    21                 int size = diskMemory.getSize();
    22                 System.out.println("get size: " + size);
    23                 diskMemory.setSize(size);
    24                 countDownLatch.countDown();
    25             });
    26         }
    27         try {
    28             countDownLatch.await();
    29         } catch (InterruptedException e) {
    30             // TODO Auto-generated catch block
    31             e.printStackTrace();
    32         }
    33         System.out.println(diskMemory.getTotalSize());
    34         executorService.shutdownNow();
    35     }
    36 
    37 }
    View Code

    执行结果:

     1 get size: 3
     2 get size: 4
     3 get size: 5
     4 get size: 5
     5 -------first totalSize: 0
     6 -------end totalSize: 3
     7 -------first totalSize: 3
     8 -------end totalSize: 8
     9 -------first totalSize: 8
    10 -------end totalSize: 12
    11 -------first totalSize: 12
    12 -------end totalSize: 17
    13 17
    View Code

     

    二、CyclicBarrier

    CyclicBarrier 翻译过来叫循环栅栏、循环障碍什么的(还是有点别扭的。所以还是别翻译了,只可意会不可言传啊)。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。

    给个情景:

    公司带领大家出外郊游,到达地方后,大家去A地游览,肯定是路上有快有满,都到了A之后,大家再集体去B地。这个工具类CyclicBarrier用于这个类情景。

    它与CountDownLatch不同之处在于,它可以重复计数,当计数减一减一最后到零时,之后重新计数。这个与CountDownLatch的一个区别。

    下面给出个例子:

     1 import java.util.Random;
     2 import java.util.concurrent.BrokenBarrierException;
     3 import java.util.concurrent.CyclicBarrier;
     4 
     5 public class Horse implements Runnable {
     6 
     7     private static int counter = 0;
     8     private final int id = counter++;
     9     private int strides = 0;
    10     private static Random rand = new Random(47);
    11     private static CyclicBarrier barrier;
    12     public Horse(CyclicBarrier b) {
    13         barrier = b;
    14     }
    15     public synchronized int getStrides() {
    16         return strides;
    17     }
    18     @Override
    19     public void run() {
    20 
    21         try {
    22             while (!Thread.interrupted()) {
    23                 synchronized (this) {
    24                     strides += rand.nextInt(3);
    25                 }
    26                 barrier.await();
    27             }
    28         } catch (InterruptedException e) {
    29             e.printStackTrace();
    30         } catch (BrokenBarrierException e) {
    31             e.printStackTrace();
    32         }
    33     }
    34     
    35     public String toString() {
    36         return "Horse " + id + " ";
    37     }
    38     
    39     public String tracks () {
    40         StringBuilder sBuilder = new StringBuilder();
    41         for (int i = 0; i < getStrides(); i++) {
    42             sBuilder.append("*");
    43         }
    44         sBuilder.append(id);
    45         return sBuilder.toString();
    46     }
    47 
    48 }
    View Code
     1 import java.util.ArrayList;
     2 import java.util.List;
     3 import java.util.concurrent.CyclicBarrier;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 public class HorseRace {
     9 
    10     static final int FINIASH_LINE = 39;
    11     private List<Horse> horses = new ArrayList<Horse>();
    12     private ExecutorService executorService = Executors.newCachedThreadPool();
    13     private CyclicBarrier barrier;
    14     
    15     public HorseRace (int nHorse, final int pause) {
    16         barrier = new CyclicBarrier(nHorse, () -> {
    17             StringBuilder s = new StringBuilder();
    18             System.out.println("============================");
    19             for (Horse horse: horses) {
    20                 System.out.println(horse.getStrides());
    21             }
    22             for (Horse horse: horses) {
    23                 if (horse.getStrides() >= FINIASH_LINE) {
    24                     System.out.println(horse + "won!");
    25                     executorService.shutdownNow();
    26                     return;
    27                 }
    28             }
    29             
    30             try {
    31                 TimeUnit.MICROSECONDS.sleep(pause);
    32             } catch (InterruptedException e) {
    33                 System.out.println(s);
    34             }
    35          });
    36         
    37         for (int i=0; i<nHorse; i++) {
    38             Horse horse = new Horse(barrier);
    39             horses.add(horse);
    40             executorService.execute(horse);
    41         }
    42     }
    43     public static void main(String[] args) {
    44 
    45         new HorseRace(3, 200);
    46     }
    47 
    48 }
    View Code

    执行结果:

      1 ============================
      2 2
      3 2
      4 1
      5 ============================
      6 3
      7 4
      8 3
      9 ============================
     10 3
     11 5
     12 5
     13 ============================
     14 4
     15 5
     16 5
     17 ============================
     18 5
     19 7
     20 5
     21 ============================
     22 6
     23 8
     24 6
     25 ============================
     26 8
     27 9
     28 7
     29 ============================
     30 9
     31 9
     32 7
     33 ============================
     34 11
     35 11
     36 7
     37 ============================
     38 12
     39 12
     40 9
     41 ============================
     42 13
     43 13
     44 9
     45 ============================
     46 13
     47 13
     48 10
     49 ============================
     50 14
     51 13
     52 11
     53 ============================
     54 14
     55 15
     56 11
     57 ============================
     58 14
     59 15
     60 12
     61 ============================
     62 15
     63 16
     64 14
     65 ============================
     66 16
     67 16
     68 15
     69 ============================
     70 18
     71 17
     72 16
     73 ============================
     74 19
     75 19
     76 16
     77 ============================
     78 19
     79 20
     80 16
     81 ============================
     82 19
     83 20
     84 18
     85 ============================
     86 21
     87 20
     88 20
     89 ============================
     90 22
     91 21
     92 21
     93 ============================
     94 24
     95 21
     96 21
     97 ============================
     98 24
     99 21
    100 21
    101 ============================
    102 25
    103 23
    104 23
    105 ============================
    106 27
    107 24
    108 25
    109 ============================
    110 29
    111 24
    112 26
    113 ============================
    114 31
    115 25
    116 26
    117 ============================
    118 31
    119 25
    120 28
    121 ============================
    122 32
    123 25
    124 28
    125 ============================
    126 34
    127 26
    128 28
    129 ============================
    130 35
    131 27
    132 30
    133 ============================
    134 35
    135 27
    136 32
    137 ============================
    138 35
    139 27
    140 34
    141 ============================
    142 36
    143 29
    144 36
    145 ============================
    146 36
    147 29
    148 38
    149 ============================
    150 38
    151 31
    152 39
    153 Horse 2 won!
    View Code

    三、Semaphore

    Semaphore 作用是只允许一定数量的线程同时执行一段任务。

    以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这是如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

    代码例子如下:

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3 import java.util.concurrent.Semaphore;
     4 
     5 public class SemaphoreTest {
     6     public static void main(String[] args) {  
     7         // 停车场
     8         ExecutorService exec = Executors.newCachedThreadPool();  
     9         // 只有五个车位
    10         final Semaphore semp = new Semaphore(5);  
    11         // 模拟20辆车进入 
    12         for (int index = 0; index < 20; index++) {
    13             final int NO = index; 
    14             exec.execute(() -> {
    15                 try {  
    16                     // 获取进入许可 
    17                     semp.acquire();  
    18                     System.out.println("Car No: " + NO);  
    19                     Thread.sleep((long) (Math.random() * 10000));  
    20                     // 出去后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞
    21                     semp.release();  
    22                 } catch (InterruptedException e) {  
    23                 }  
    24             });  
    25         }  
    26         // 退出线程池 
    27         exec.shutdown();  
    28     }  
    29 }
    View Code
     1 Car No: 1
     2 Car No: 2
     3 Car No: 3
     4 Car No: 0
     5 Car No: 4
     6 Car No: 5
     7 Car No: 6
     8 Car No: 7
     9 Car No: 8
    10 Car No: 9
    11 Car No: 10
    12 Car No: 11
    13 Car No: 12
    14 Car No: 13
    15 Car No: 14
    16 Car No: 15
    17 Car No: 16
    18 Car No: 17
    19 Car No: 18
    20 Car No: 19
    View Code

    四、PriorityBlockingQueue

    名字起的非常的简洁,同时又把所有的重点描述清楚了。首先是Priority,优先级的意思,表明他可以排序,之后是Blocking,因为在没有数据被取出的时候,会发生阻塞,最后,这个一个队列,我们不需要显式的控制它的并发,一切都在它的内部自己完成。

    上例子:

     1 import java.util.ArrayList;
     2 import java.util.List;
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 import java.util.concurrent.PriorityBlockingQueue;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 public class PriorityBlockingQueueDemo {
     9 
    10     public static void main(String[] args) throws InterruptedException {
    11 
    12         PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
    13         ExecutorService service = Executors.newCachedThreadPool();
    14         service.execute(new Thread(() -> {
    15             System.out.println("Polling...");
    16             while (true) {
    17                 Integer poll;
    18                 try {
    19                     poll = queue.take();
    20                     System.out.println("Pooled: " + poll);
    21                 } catch (Exception e) {
    22                     e.printStackTrace();
    23                 }
    24             }
    25         }));
    26         
    27         TimeUnit.MICROSECONDS.sleep(5);
    28         System.out.println("Adding to queue");
    29         List<Integer> list = new ArrayList<>();
    30         list.add(1);
    31         list.add(5);
    32         list.add(1);
    33         list.add(4);
    34         list.add(3);
    35         list.add(1);
    36         queue.addAll(list);
    37         TimeUnit.MICROSECONDS.sleep(1);
    38     }
    39 }
    View Code
    1 Polling...
    2 Adding to queue
    3 Pooled: 1
    4 Pooled: 1
    5 Pooled: 1
    6 Pooled: 3
    7 Pooled: 4
    8 Pooled: 5
    View Code

    PriorityBlockingQueue里面存储的对象一定要实现Comparable接口。队列通过这个接口的compareTo()方法来实现优先级排序的。如果我们自己不重写compareTo()方法,那么会按照类中默认的排序方法,进行排序。

  • 相关阅读:
    Educational Codeforces Round 85 D. Minimum Euler Cycle(模拟/数学/图)
    Educational Codeforces Round 85 C. Circle of Monsters(贪心)
    NOIP 2017 提高组 DAY1 T1小凯的疑惑(二元一次不定方程)
    Educational Codeforces Round 85 B. Middle Class(排序/贪心/水题)
    Educational Codeforces Round 85 A. Level Statistics(水题)
    IOS中的三大事件
    用Quartz 2D画小黄人
    strong、weak、copy、assign 在命名属性时候怎么用
    用代码生成UINavigationController 与UITabBarController相结合的简单QQ框架(部分)
    Attempting to badge the application icon but haven't received permission from the user to badge the application错误解决办法
  • 原文地址:https://www.cnblogs.com/lihao007/p/7630090.html
Copyright © 2011-2022 走看看