zoukankan      html  css  js  c++  java
  • 并发(三)

    本篇讲述新类库中的工具类。

    参考博客资料:

    http://developer.51cto.com/art/201403/432095.htm

    http://blog.csdn.net/flyingdon/article/details/5110582

    http://blog.csdn.net/shihuacai/article/details/8856526

    《JAVA思想编程》

    一、CountDownLatch
     CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作。例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工作都完成,主线程才能继续它的工作。这些准备工作彼此独立,所以可以并发执行以提高速度。在这个场景下就可以使用 CountDownLatch 协调线程之间的调度了。在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join()。在 JUC 出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了。

    例子:

    a,b,c,d四个盘,分别统计出大小,最后要求得四个盘总的大小的值。

    首先想到的是CountDownLatch这个类,在所有单个盘的大小统计完成之后,再进行总和的计算。之后,我们需要一个DiskMemory类,抽象盘的大小的统计。有个size和totalSize,大概就这些。启动线程时,启动4次,每次分别计算单个的size,最后汇总。

    代码如下:

     1 import java.util.Random;
     2 
     3 public class DiskMemory {
     4 
     5     private static int totalSize;
     6     
     7     public int getSize() {
     8         return new Random().nextInt(5) + 1;
     9     }
    10     
    11     public void setSize(int size) {
    12         // synchronized method is must
    13         synchronized (this) {
    14             System.out.println("-------first totalSize: " + totalSize);
    15             totalSize = totalSize + size;
    16             System.out.println("-------end totalSize: " + totalSize);
    17         }
    18         
    19     }
    20     
    21     public int getTotalSize() {
    22         return totalSize;
    23     }
    24 }
    View Code
     1 import java.util.concurrent.CountDownLatch;
     2 import java.util.concurrent.ExecutorService;
     3 import java.util.concurrent.Executors;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 public class CountSizeDemo {
     7 
     8     public static void main(String[] args) {
     9 
    10         CountDownLatch countDownLatch = new CountDownLatch(4);
    11         DiskMemory diskMemory = new DiskMemory();
    12         ExecutorService executorService = Executors.newCachedThreadPool();
    13         for (int i = 0; i < 4; i++) {
    14 
    15             executorService.execute(() -> {
    16                 try {
    17                     TimeUnit.MICROSECONDS.sleep(1000);
    18                 } catch (InterruptedException ex) {
    19                     ex.printStackTrace();
    20                 }
    21                 int size = diskMemory.getSize();
    22                 System.out.println("get size: " + size);
    23                 diskMemory.setSize(size);
    24                 countDownLatch.countDown();
    25             });
    26         }
    27         try {
    28             countDownLatch.await();
    29         } catch (InterruptedException e) {
    30             // TODO Auto-generated catch block
    31             e.printStackTrace();
    32         }
    33         System.out.println(diskMemory.getTotalSize());
    34         executorService.shutdownNow();
    35     }
    36 
    37 }
    View Code

    执行结果:

     1 get size: 3
     2 get size: 4
     3 get size: 5
     4 get size: 5
     5 -------first totalSize: 0
     6 -------end totalSize: 3
     7 -------first totalSize: 3
     8 -------end totalSize: 8
     9 -------first totalSize: 8
    10 -------end totalSize: 12
    11 -------first totalSize: 12
    12 -------end totalSize: 17
    13 17
    View Code

     

    二、CyclicBarrier

    CyclicBarrier 翻译过来叫循环栅栏、循环障碍什么的(还是有点别扭的。所以还是别翻译了,只可意会不可言传啊)。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。

    给个情景:

    公司带领大家出外郊游,到达地方后,大家去A地游览,肯定是路上有快有满,都到了A之后,大家再集体去B地。这个工具类CyclicBarrier用于这个类情景。

    它与CountDownLatch不同之处在于,它可以重复计数,当计数减一减一最后到零时,之后重新计数。这个与CountDownLatch的一个区别。

    下面给出个例子:

     1 import java.util.Random;
     2 import java.util.concurrent.BrokenBarrierException;
     3 import java.util.concurrent.CyclicBarrier;
     4 
     5 public class Horse implements Runnable {
     6 
     7     private static int counter = 0;
     8     private final int id = counter++;
     9     private int strides = 0;
    10     private static Random rand = new Random(47);
    11     private static CyclicBarrier barrier;
    12     public Horse(CyclicBarrier b) {
    13         barrier = b;
    14     }
    15     public synchronized int getStrides() {
    16         return strides;
    17     }
    18     @Override
    19     public void run() {
    20 
    21         try {
    22             while (!Thread.interrupted()) {
    23                 synchronized (this) {
    24                     strides += rand.nextInt(3);
    25                 }
    26                 barrier.await();
    27             }
    28         } catch (InterruptedException e) {
    29             e.printStackTrace();
    30         } catch (BrokenBarrierException e) {
    31             e.printStackTrace();
    32         }
    33     }
    34     
    35     public String toString() {
    36         return "Horse " + id + " ";
    37     }
    38     
    39     public String tracks () {
    40         StringBuilder sBuilder = new StringBuilder();
    41         for (int i = 0; i < getStrides(); i++) {
    42             sBuilder.append("*");
    43         }
    44         sBuilder.append(id);
    45         return sBuilder.toString();
    46     }
    47 
    48 }
    View Code
     1 import java.util.ArrayList;
     2 import java.util.List;
     3 import java.util.concurrent.CyclicBarrier;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 public class HorseRace {
     9 
    10     static final int FINIASH_LINE = 39;
    11     private List<Horse> horses = new ArrayList<Horse>();
    12     private ExecutorService executorService = Executors.newCachedThreadPool();
    13     private CyclicBarrier barrier;
    14     
    15     public HorseRace (int nHorse, final int pause) {
    16         barrier = new CyclicBarrier(nHorse, () -> {
    17             StringBuilder s = new StringBuilder();
    18             System.out.println("============================");
    19             for (Horse horse: horses) {
    20                 System.out.println(horse.getStrides());
    21             }
    22             for (Horse horse: horses) {
    23                 if (horse.getStrides() >= FINIASH_LINE) {
    24                     System.out.println(horse + "won!");
    25                     executorService.shutdownNow();
    26                     return;
    27                 }
    28             }
    29             
    30             try {
    31                 TimeUnit.MICROSECONDS.sleep(pause);
    32             } catch (InterruptedException e) {
    33                 System.out.println(s);
    34             }
    35          });
    36         
    37         for (int i=0; i<nHorse; i++) {
    38             Horse horse = new Horse(barrier);
    39             horses.add(horse);
    40             executorService.execute(horse);
    41         }
    42     }
    43     public static void main(String[] args) {
    44 
    45         new HorseRace(3, 200);
    46     }
    47 
    48 }
    View Code

    执行结果:

      1 ============================
      2 2
      3 2
      4 1
      5 ============================
      6 3
      7 4
      8 3
      9 ============================
     10 3
     11 5
     12 5
     13 ============================
     14 4
     15 5
     16 5
     17 ============================
     18 5
     19 7
     20 5
     21 ============================
     22 6
     23 8
     24 6
     25 ============================
     26 8
     27 9
     28 7
     29 ============================
     30 9
     31 9
     32 7
     33 ============================
     34 11
     35 11
     36 7
     37 ============================
     38 12
     39 12
     40 9
     41 ============================
     42 13
     43 13
     44 9
     45 ============================
     46 13
     47 13
     48 10
     49 ============================
     50 14
     51 13
     52 11
     53 ============================
     54 14
     55 15
     56 11
     57 ============================
     58 14
     59 15
     60 12
     61 ============================
     62 15
     63 16
     64 14
     65 ============================
     66 16
     67 16
     68 15
     69 ============================
     70 18
     71 17
     72 16
     73 ============================
     74 19
     75 19
     76 16
     77 ============================
     78 19
     79 20
     80 16
     81 ============================
     82 19
     83 20
     84 18
     85 ============================
     86 21
     87 20
     88 20
     89 ============================
     90 22
     91 21
     92 21
     93 ============================
     94 24
     95 21
     96 21
     97 ============================
     98 24
     99 21
    100 21
    101 ============================
    102 25
    103 23
    104 23
    105 ============================
    106 27
    107 24
    108 25
    109 ============================
    110 29
    111 24
    112 26
    113 ============================
    114 31
    115 25
    116 26
    117 ============================
    118 31
    119 25
    120 28
    121 ============================
    122 32
    123 25
    124 28
    125 ============================
    126 34
    127 26
    128 28
    129 ============================
    130 35
    131 27
    132 30
    133 ============================
    134 35
    135 27
    136 32
    137 ============================
    138 35
    139 27
    140 34
    141 ============================
    142 36
    143 29
    144 36
    145 ============================
    146 36
    147 29
    148 38
    149 ============================
    150 38
    151 31
    152 39
    153 Horse 2 won!
    View Code

    三、Semaphore

    Semaphore 作用是只允许一定数量的线程同时执行一段任务。

    以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这是如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

    代码例子如下:

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3 import java.util.concurrent.Semaphore;
     4 
     5 public class SemaphoreTest {
     6     public static void main(String[] args) {  
     7         // 停车场
     8         ExecutorService exec = Executors.newCachedThreadPool();  
     9         // 只有五个车位
    10         final Semaphore semp = new Semaphore(5);  
    11         // 模拟20辆车进入 
    12         for (int index = 0; index < 20; index++) {
    13             final int NO = index; 
    14             exec.execute(() -> {
    15                 try {  
    16                     // 获取进入许可 
    17                     semp.acquire();  
    18                     System.out.println("Car No: " + NO);  
    19                     Thread.sleep((long) (Math.random() * 10000));  
    20                     // 出去后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞
    21                     semp.release();  
    22                 } catch (InterruptedException e) {  
    23                 }  
    24             });  
    25         }  
    26         // 退出线程池 
    27         exec.shutdown();  
    28     }  
    29 }
    View Code
     1 Car No: 1
     2 Car No: 2
     3 Car No: 3
     4 Car No: 0
     5 Car No: 4
     6 Car No: 5
     7 Car No: 6
     8 Car No: 7
     9 Car No: 8
    10 Car No: 9
    11 Car No: 10
    12 Car No: 11
    13 Car No: 12
    14 Car No: 13
    15 Car No: 14
    16 Car No: 15
    17 Car No: 16
    18 Car No: 17
    19 Car No: 18
    20 Car No: 19
    View Code

    四、PriorityBlockingQueue

    名字起的非常的简洁,同时又把所有的重点描述清楚了。首先是Priority,优先级的意思,表明他可以排序,之后是Blocking,因为在没有数据被取出的时候,会发生阻塞,最后,这个一个队列,我们不需要显式的控制它的并发,一切都在它的内部自己完成。

    上例子:

     1 import java.util.ArrayList;
     2 import java.util.List;
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 import java.util.concurrent.PriorityBlockingQueue;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 public class PriorityBlockingQueueDemo {
     9 
    10     public static void main(String[] args) throws InterruptedException {
    11 
    12         PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
    13         ExecutorService service = Executors.newCachedThreadPool();
    14         service.execute(new Thread(() -> {
    15             System.out.println("Polling...");
    16             while (true) {
    17                 Integer poll;
    18                 try {
    19                     poll = queue.take();
    20                     System.out.println("Pooled: " + poll);
    21                 } catch (Exception e) {
    22                     e.printStackTrace();
    23                 }
    24             }
    25         }));
    26         
    27         TimeUnit.MICROSECONDS.sleep(5);
    28         System.out.println("Adding to queue");
    29         List<Integer> list = new ArrayList<>();
    30         list.add(1);
    31         list.add(5);
    32         list.add(1);
    33         list.add(4);
    34         list.add(3);
    35         list.add(1);
    36         queue.addAll(list);
    37         TimeUnit.MICROSECONDS.sleep(1);
    38     }
    39 }
    View Code
    1 Polling...
    2 Adding to queue
    3 Pooled: 1
    4 Pooled: 1
    5 Pooled: 1
    6 Pooled: 3
    7 Pooled: 4
    8 Pooled: 5
    View Code

    PriorityBlockingQueue里面存储的对象一定要实现Comparable接口。队列通过这个接口的compareTo()方法来实现优先级排序的。如果我们自己不重写compareTo()方法,那么会按照类中默认的排序方法,进行排序。

  • 相关阅读:
    POJ3233]Matrix Power Series && [HDU1588]Gauss Fibonacci
    [codeforces 508E]Maximum Matching
    [SDOI2011]染色
    [CSU1806]Toll
    [HDU4969]Just a Joke
    [HDU1071]The area
    [HDU1724]Ellipse
    [VIJOS1889]天真的因数分解
    [BZOJ3379] Turning in Homework
    [BZOJ1572] WorkScheduling
  • 原文地址:https://www.cnblogs.com/lihao007/p/7630090.html
Copyright © 2011-2022 走看看