zoukankan      html  css  js  c++  java
  • 并发编程之基础( 四)

    新类库

    前面已经把并发编程的基础知识讲的差不多了,这章主要介绍一下JAVA中其它一些关于并发编程的类库,主要有一下几个类库。

    • CountDownLatch
    • CyclicBarrier
    • BlockingQueue
    • ScheduleExecutor
    • Semaphore
    • Exchanger

    1. CountDownLatch

      该类主要是同步一个或多个任务,强制一个或多个任务等待其它任务执行的一组操作完成。可以给该对象设置一个初始计数值,当计数值不为0时,调用该对象的await()方法就会阻塞,调用counDown()方法会让计数值减1,当计数值为0时阻塞任务会被唤醒。其典型用法就是将一个程序分成多个独立的任务,并给CountDownLatch设定一个初始值,该初始值应该为首先需要执行的线程的个数(比如赛跑,5个运动员都做好准备之后,裁判才能打枪,这时初始值应该设置为5)。一些任务需要等待其它任务先完成或者其它任务的一部分完成,那么可以待用await()将自己挂起。而另一些任务的某些操作完成时调用countDown()方法来减小计数值,等待计数值为0时,挂起的任务则则认为当前所有的条件以满足继续执行的需要了,则可以继续运行。注意:计数值只能被设置一次且在new的时候就要指定初值,而且该对象只能使用一次,如果想重复使用,请考虑CyclicBarrier

     1 package com.dy.xidian;
     2 
     3 import java.util.Random;
     4 import java.util.concurrent.CountDownLatch;
     5 import java.util.concurrent.ExecutorService;
     6 import java.util.concurrent.Executors;
     7 import java.util.concurrent.TimeUnit;
     8 
     9 class TaskPortion implements Runnable {
    10     private static int counter = 0;
    11     private final int id = counter++;
    12     private static Random rand = new Random(47);
    13     private final CountDownLatch latch;
    14 
    15     public TaskPortion(CountDownLatch latch) {
    16         super();
    17         this.latch = latch;
    18     }
    19 
    20     @Override
    21     public void run() {
    22         try {
    23             doWork();
    24             latch.countDown();
    25         } catch (InterruptedException e) {
    26         }
    27 
    28     }
    29 
    30     public void doWork() throws InterruptedException {
    31         TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
    32         System.out.println(this + "completed");
    33     }
    34 
    35     public String toString() {
    36         return String.format("%1$-3d", id);
    37     }
    38 }
    39 
    40 class WaitingTask implements Runnable {
    41     private static int counter = 0;
    42     private final int id = counter++;
    43     private final CountDownLatch latch;
    44 
    45     public WaitingTask(CountDownLatch latch) {
    46         super();
    47         this.latch = latch;
    48     }
    49 
    50     @Override
    51     public void run() {
    52         try {
    53             latch.await();
    54             System.out.println("Latch barrier passed for " + this);
    55         } catch (InterruptedException e) {
    56             System.out.println(this + " interrupted");
    57         }
    58     }
    59 
    60     public String toString() {
    61         return String.format("WaitingTask %1$-3d ", id);
    62     }
    63 }
    64 public class CountDownLatchDemo {
    65     static final int SIZE = 100;
    66     public static void main(String[] args) {
    67         ExecutorService exec = Executors.newCachedThreadPool();
    68         CountDownLatch latch = new CountDownLatch(SIZE);
    69         for (int i = 0; i < 10; i++)
    70             exec.execute(new WaitingTask(latch));
    71         for (int i = 0; i < SIZE; i++)
    72             exec.execute(new TaskPortion(latch));
    73         System.out.println("Launched all tasks");
    74         exec.shutdownNow();
    75     }
    76 }
    View Code

    2. CyclicBarrier

      CyclicBarrier与CountDownLatch功能差不多,不同之处就是可以多次使用,等到计数值变为0时,它会自动重置。而且不需要每个线程都去调用类似countDown()这样的方法,因为每调用一个await(),它就会自动将计数值减1。它使用于这种情况:多个线程并行执行工作,大家一致向前推进,所有线程在这个阶段的工作都完成了(所有的线程都调用了await方法),才能进入下一阶段,而对于那些早完成的线程只能先等待了。下面是一个赛马比赛,每个马可以看作一个线程,等所有的马都达到栅栏后,才能开始新一轮的比赛。

      1 package com.dy.xidian;
      2 
      3 import java.util.ArrayList;
      4 import java.util.List;
      5 import java.util.Random;
      6 import java.util.concurrent.BrokenBarrierException;
      7 import java.util.concurrent.CyclicBarrier;
      8 import java.util.concurrent.ExecutorService;
      9 import java.util.concurrent.Executors;
     10 import java.util.concurrent.TimeUnit;
     11 
     12 class Horse implements Runnable {
     13     private static int counter = 0;
     14     private final int id = counter++;
     15     private int strides = 0;
     16     private static Random rand = new Random(47);
     17     private static CyclicBarrier barrier;
     18 
     19     public Horse(CyclicBarrier b) {
     20         barrier = b;
     21     }
     22 
     23     public synchronized int getStriders() {
     24         return strides;
     25     }
     26 
     27     @Override
     28     public void run() {
     29         try {
     30             while (!Thread.interrupted()) {
     31                 synchronized (this) {
     32                     strides += rand.nextInt(3);
     33                 }
     34                 barrier.await();
     35             }
     36         } catch (InterruptedException e) {
     37             // TODO
     38         } catch (BrokenBarrierException e) {
     39             throw new RuntimeException(e);
     40         }
     41     }
     42 
     43     public String toString() {
     44         return "Horse " + id + " ";
     45     }
     46 
     47     public String tracks() {
     48         StringBuilder s = new StringBuilder();
     49         for (int i = 0; i < getStriders(); i++)
     50             s.append("*");
     51         s.append(id);
     52         return s.toString();
     53     }
     54 }
     55 
     56 public class HorseRace {
     57     static final int FINISH_LINE = 75;
     58     private List<Horse> horses = new ArrayList<Horse>();
     59     private ExecutorService exec = Executors.newCachedThreadPool();
     60     private CyclicBarrier barrier;
     61 
     62     public HorseRace(int nHorses, final int pause) {
     63         barrier = new CyclicBarrier(nHorses, new Runnable() {
     64 
     65             @Override
     66             public void run() {
     67                 StringBuilder s = new StringBuilder();
     68                 for (int i = 0; i < FINISH_LINE; i++) {
     69                     s.append("=");
     70                     System.out.println(s);
     71                     for (Horse horse : horses)
     72                         System.out.println(horse.tracks());
     73                     for (Horse horse : horses)
     74                         if (horse.getStriders() >= FINISH_LINE) {
     75                             System.out.println(horse + "won!");
     76                             exec.shutdownNow();
     77                             return;
     78                         }
     79                     try {
     80                         TimeUnit.MILLISECONDS.sleep(pause);
     81                     } catch (InterruptedException e) {
     82                         System.out.println("barrier-action sleep interrupted");
     83                     }
     84                 }
     85             }
     86         });
     87 
     88         for (int i = 0; i < nHorses; i++) {
     89             Horse horse = new Horse(barrier);
     90             horses.add(horse);
     91             exec.execute(horse);
     92         }
     93     }
     94 
     95     public static void main(String[] args) {
     96         int nHorses = 7;
     97         int pause = 200;
     98         if (args.length > 0) {
     99             int n = new Integer(args[0]);
    100             nHorses = n > 0 ? n : nHorses;
    101         }
    102         if (args.length > 1) {
    103             int p = new Integer(args[1]);
    104             pause = p > -1 ? p : pause;
    105         }
    106         new HorseRace(nHorses, pause);
    107     }
    108 }
    View Code

    运行结果:

    =
    **0
    ***1
    *2
    **3
    *4
    ***5
    ***6
    ==
    **0
    ***1
    *2
    **3
    *4
    ***5
    ***6
    ===
    **0
    ***1
    *2
    **3
    *4
    ***5
    ***6
    ====
    

      运行结果中的==表示栅栏,数字为每个马的编号,*的个数代表每个马目前跑了多少步。在代码我,我们可以看到在创建CyclicBarrier对象时,我们还给他传递了一个复写了Runnable后的对象,这是我CounDownLatch不同的地方。每当计数器的值为0的是时候,里面的该对象中的run方法会被调用。可能有这样一种情况,当计数值再次变为0时,上次的run方法还没执行完,它会不会创建新的线程重新执行run方法呢?通过测试,这种情况是不会发生的,只有等run执行完,才会去创建新的线程。

     3 BlockingQueue

    BlockingQueue是一个接口,用于生产者-消费者模型,是一个线程安全的容器。它的实现类有LinkedBlockingQueue(空间无限,FIFO), ArrayBlockingQueue(空间有限,FIFO),PriorityBlockingQueue(元素等级高的在队头),SynchronousQueue(内部没有缓冲区,生产者线程需要将产品直接交给一个空闲的消费者线程,否则将一直处于阻塞状态)

    3.1 DelayQueue

      DelayQueue是一个无界的阻塞队列,用于存放实现了Delayed接口的对象,其中的对象只能在其延迟期满才能从队列中取走。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果没有任何延迟期满的对象,那就不会有任何头元素,这时如果使用take()方法从队列获取对象时会发生阻塞,使用poll时会直接返回null。

      1 package com.dy.xidian;
      2 
      3 import java.util.ArrayList;
      4 import java.util.List;
      5 import java.util.Random;
      6 import java.util.concurrent.DelayQueue;
      7 import java.util.concurrent.Delayed;
      8 import java.util.concurrent.ExecutorService;
      9 import java.util.concurrent.Executors;
     10 import java.util.concurrent.TimeUnit;
     11 
     12 class DelayedTask implements Runnable, Delayed {
     13     private static int counter = 0;
     14     private final int id = counter++;
     15     private final int delayTime;
     16     private final long trigger;
     17 
     18     protected static List<DelayedTask> sequeue = new ArrayList<DelayedTask>();
     19 
     20     // System.nanoTime()获取当前时间,结果是纳秒级
     21     // TimeUnit.MILLSECONDS.convert(time, TimeUnit.SECONDS)
     22     // 时间转换(一般是大单位转小单位),比如计算1s=多少ms之类的
     23     // time是时间,TimeUnit.SECONDS是原始单位(s),MILLISECONDS是转换后的单位(ms)
     24     public DelayedTask(int delayInMilliseconds) {
     25         delayTime = delayInMilliseconds;
     26         trigger = System.nanoTime()
     27                 + TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS);
     28         sequeue.add(this);
     29     }
     30 
     31     // 重载Delayed接口的getDelay方法,该示例代码给出的是重载的标准形式
     32     @Override
     33     public long getDelay(TimeUnit unit) {
     34         return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
     35 
     36     }
     37 
     38     //比较每个对象的触发时间,以确定在队列中的位置
     39     @Override
     40     public int compareTo(Delayed arg) {
     41         DelayedTask that = (DelayedTask) arg;
     42         if (trigger < that.trigger)
     43             return -1;
     44         if (trigger > that.trigger)
     45             return 1;
     46         return 0;
     47     }
     48 
     49     @Override
     50     public void run() {
     51         System.out.println(this + " ");
     52     }
     53 
     54     public String toString() {
     55         return String.format("[%1$-4d]", counter) + "Task " + id;
     56     }
     57 
     58     public String summary() {
     59         return "(" + id + ":" + counter + ")";
     60     }
     61 
     62     public static class EndSentinel extends DelayedTask {
     63         private ExecutorService exec;
     64 
     65         public EndSentinel(int delay, ExecutorService e) {
     66             super(delay);
     67             exec = e;
     68         }
     69 
     70         public void run() {
     71             for (DelayedTask pt : sequeue) {
     72                 System.out.println(pt.summary() + " ");
     73             }
     74             System.out.println(this + " Calling shutdownNow()");
     75             exec.shutdownNow();
     76         }
     77     }
     78 }
     79 
     80 class DelayTaskConsumer implements Runnable {
     81     private DelayQueue<DelayedTask> q;
     82     public DelayTaskConsumer(DelayQueue<DelayedTask> q) {
     83         this.q = q;
     84     }
     85 
     86     @Override
     87     public void run() {
     88         try {
     89             while (!Thread.interrupted())
     90                 q.take().run();
     91         } catch (InterruptedException e) {
     92         }
     93         System.out.println("Finised DelayedTaskConsumer!");
     94     }
     95 
     96 }
     97 public class DelayQueueDemo {
     98     public static void main(String[] args) {
     99         Random rand = new Random(47);
    100         ExecutorService exec = Executors.newCachedThreadPool();
    101         DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
    102         for (int i = 0; i < 20; i++)
    103             queue.put(new DelayedTask(rand.nextInt(5000)));
    104         queue.add(new DelayedTask.EndSentinel(5000, exec));
    105         exec.execute(new DelayTaskConsumer(queue));
    106     }
    107 }
    View Code

    运行结果:

     1 [128 ]Task 11 
     2 [200 ]Task 7 
     3 [429 ]Task 5 
     4 [520 ]Task 18 
     5 [555 ]Task 1 
     6 [961 ]Task 4 
     7 [998 ]Task 16 
     8 [1207]Task 9 
     9 [1693]Task 2 
    10 [1809]Task 14 
    11 [1861]Task 3 
    12 [2278]Task 15 
    13 [3288]Task 10 
    14 [3551]Task 12 
    15 [4258]Task 0 
    16 [4258]Task 19 
    17 [4522]Task 8 
    18 [4589]Task 13 
    19 [4861]Task 17 
    20 [4868]Task 6 
    21 (0:21) 
    22 (1:21) 
    23 (2:21) 
    24 (3:21) 
    25 (4:21) 
    26 (5:21) 
    27 (6:21) 
    28 (7:21) 
    29 (8:21) 
    30 (9:21) 
    31 (10:21) 
    32 (11:21) 
    33 (12:21) 
    34 (13:21) 
    35 (14:21) 
    36 (15:21) 
    37 (16:21) 
    38 (17:21) 
    39 (18:21) 
    40 (19:21) 
    41 (20:21) 
    42 [5000]Task 20 Calling shutdownNow()
    43 Finised DelayedTaskConsumer!
    View Code

      该程序创建了20个delayedTask对象,这20对象其实是线程对象,然后将这20对象放入DelayedQueue中,同时将这20个对象加入到list中以表明创建的先后顺序。每个线程的延迟期是通过随机数指定的。在DelayedTask中有一个内部类,该类的作用就是遍历list,输出每个线程的信息(id + 延迟期),最后关闭整个线程。DelayedTaskConsumer就是不断从DelayedQueue中取线程对象,然后让其执行。

      关于Delayed接口的实现这里要强调一下,代码中写的是标准形式,也是策略模式的一种简单实现。delayTime是延迟期,需要我们指定。trigger表示这个对象的激活时间(比如到11点整时,其延迟期满),其计算方法就是获取当前时间+延迟期。而getDelay(TimeUnit unit)这个函数是个关键,这个函数会被调用两次:第一次查看延期满的时间点和当前时间之差(比如当前时间9点,延迟期满是在11点),发现是正值,对象需要继续等待;第二次查看时发现是负值(比如当前时间已经到了12点了),返回值为负数,说明对象的延迟期已经到了,可以使用了。 

    3.2 PriorityBlockingQueue

    队列是按照优先级级顺序排序的,优先级大的在队头。队列中的对象应该实现Comparable接口。在compareTo中,当和其他对象比较时,如果该方法返回负数,那么在队列里面的优先级就比较高。

      1 package com.dy.xidian;
      2 
      3 import java.util.ArrayList;
      4 import java.util.List;
      5 import java.util.Queue;
      6 import java.util.Random;
      7 import java.util.concurrent.ExecutorService;
      8 import java.util.concurrent.Executors;
      9 import java.util.concurrent.PriorityBlockingQueue;
     10 import java.util.concurrent.TimeUnit;
     11 
     12 class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
     13     private Random rand = new Random(47);
     14     private static int counter = 0;
     15     private final int id = counter++;
     16     private final int priority;
     17 
     18     protected static List<PrioritizedTask> sequeue = new ArrayList<PrioritizedTask>();
     19 
     20     public PrioritizedTask(int priority) {
     21         super();
     22         this.priority = priority;
     23         sequeue.add(this);
     24     }
     25 
     26     @Override
     27     public int compareTo(PrioritizedTask that) {
     28         if (this.priority > that.priority)
     29             return -1;
     30         if (this.priority < that.priority)
     31             return 1;
     32         return 0;
     33     }
     34 
     35     @Override
     36     public void run() {
     37         try {
     38             TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
     39             System.out.println(this);
     40         } catch (InterruptedException e) {
     41         }
     42 
     43     }
     44 
     45     @Override
     46     public String toString() {
     47         return String.format("[%1$-3d]", priority) + "Task" + id;
     48     }
     49 
     50     public String summary() {
     51         return "(" + id + " : " + priority + ")";
     52     }
     53 
     54     public static class EndSentinel extends PrioritizedTask {
     55         private ExecutorService exec;
     56 
     57         public EndSentinel(ExecutorService e) {
     58             super(-1);
     59             exec = e;
     60         }
     61 
     62         public void run() {
     63             int count = 0;
     64             for (PrioritizedTask pt : sequeue) {
     65                 System.out.println(pt.summary());
     66                 if (++count % 5 == 0)
     67                     System.out.println("");
     68             }
     69             System.out.println("");
     70             System.out.println(this + " Calling shutdownNow()");
     71             exec.shutdownNow();
     72         }
     73     }
     74 }
     75 
     76 class PrioritizedTaskProducer implements Runnable {
     77     private Random rand = new Random(47);
     78     private Queue<Runnable> queue;
     79     private ExecutorService exec;
     80 
     81     public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
     82         super();
     83         this.queue = queue;
     84         this.exec = exec;
     85     }
     86 
     87     @Override
     88     public void run() {
     89         for (int i = 0; i < 10; i++) {
     90             queue.add(new PrioritizedTask(rand.nextInt(10)));
     91             Thread.yield();
     92         }
     93         try {
     94             for (int i = 0; i < 10; i++) {
     95                 TimeUnit.MILLISECONDS.sleep(250);
     96                 queue.add(new PrioritizedTask(10));
     97             }
     98 
     99         } catch (InterruptedException e) {
    100         }
    101         for (int i = 0; i < 10; i++)
    102             queue.add(new PrioritizedTask(i));
    103         queue.add(new PrioritizedTask.EndSentinel(exec));
    104         System.out.println("Finished PrioritizedTaskProducer");
    105     }
    106 }
    107 
    108 class PrioritizedTaskConsumer implements Runnable {
    109     private PriorityBlockingQueue<Runnable> q;
    110     public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
    111         super();
    112         this.q = q;
    113     }
    114 
    115     @Override
    116     public void run() {
    117         try {
    118             TimeUnit.SECONDS.sleep(1);
    119         } catch (InterruptedException e1) {
    120         }
    121         try {
    122             while (!Thread.interrupted()) {
    123                 q.take().run();
    124             }
    125         } catch (InterruptedException e) {
    126             System.out.println("Interrupted Execption!");
    127         }
    128         System.out.println("Finished PrioritizedTaskConsumer");
    129     }
    130 }
    131 
    132 public class PriorityBlockingQueueDemo {
    133     public static void main(String[] args) throws InterruptedException {
    134         ExecutorService exec = Executors.newCachedThreadPool();
    135         PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
    136         exec.execute(new PrioritizedTaskProducer(queue, exec));
    137         exec.execute(new PrioritizedTaskConsumer(queue));
    138     }
    139 }
    View Code

    4.ScheduledThreadPoolExecutor

    制定任务计划表,指定主线程运行多少秒(毫秒)后,开启子线程来运行别的任务。

     1 package com.dy.xidian;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 import java.util.concurrent.ScheduledThreadPoolExecutor;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 public class TaskTest {
     9     static ScheduledThreadPoolExecutor scheduler = null;
    10     static int index = 0;
    11 
    12     public static void main(String[] args) {
    13 
    14         // 构造一个ScheduledThreadPoolExecutor对象,并且设置它的容量为5个
    15         scheduler = new ScheduledThreadPoolExecutor(5);
    16         MyTask task = new MyTask();
    17         // 隔2秒后开始执行任务,并且在上一次任务开始后隔一秒再执行一次;
    18         // stpe.scheduleWithFixedDelay(task, 2, 1, TimeUnit.SECONDS);
    19         // 隔6秒后执行一次,但只会执行一次。
    20         for (int i = 0; i < 10; i++)
    21             scheduler.schedule(task, i + 1, TimeUnit.SECONDS);
    22     }
    23 
    24     private static String getTimes() {
    25         SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss E");
    26         Date date = new Date();
    27         date.setTime(System.currentTimeMillis());
    28         return format.format(date);
    29     }
    30 
    31     private static class MyTask implements Runnable {
    32 
    33         @Override
    34         public void run() {
    35             index++;
    36             System.out.println(getTimes() + " " + index);
    37             if (index >= 10) {
    38                 scheduler.shutdownNow();
    39             }
    40         }
    41     }
    42 }

    5.Semaphore

      同类资源只有一个的话,我们可以用Lock或者是synchronized来对它进行互斥访问。当同类资源数量有多个,能够满足多个线程同时操作时,可以考虑到使用信号量来实现互斥访问。

    对象池代码

     1 package com.dy.xidian;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.Semaphore;
     6 
     7 public class ObjectPool<T> {
     8     private int size;
     9     private List<T> items = new ArrayList<T>();
    10     private volatile boolean[] checkOut;
    11     private Semaphore available;
    12 
    13     public ObjectPool(Class<T> classObject, int size) {
    14         this.size = size;
    15         checkOut = new boolean[size];
    16         //size表示初始资源数,true表示对请求进行先来先服务操作
    17         available = new Semaphore(size, true);
    18         for (int i = 0; i < size; i++) {
    19             try {
    20                 items.add(classObject.newInstance());
    21             } catch (Exception e) {
    22                 throw new RuntimeException(e);
    23             }
    24         }
    25     }
    26 
    27     public T checkOut() throws InterruptedException {
    28         //获取信号量,如果没有资源请等待,信号量计数减1
    29         available.acquire();
    30         return getItem();
    31     }
    32 
    33     private synchronized T getItem() {
    34         for (int i = 0; i < size; i++) {
    35             if (!checkOut[i]) {
    36                 checkOut[i] = true;
    37                 return items.get(i);
    38             }
    39         }
    40         return null;
    41     }
    42 
    43     public void checkIn(T x) {
    44         //归还资源,释放信号量,信号量计数加1
    45         if (releaseItem(x))
    46             available.release();
    47     }
    48 
    49     private synchronized boolean releaseItem(T item) {
    50         int index = items.indexOf(item);
    51         if (index == -1)
    52             return false;
    53         if (checkOut[index]) {
    54             checkOut[index] = false;
    55             return true;
    56         }
    57         return false;
    58     }
    59 }
    View Code

    信号量Demo:

     1 package com.dy.xidian;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ExecutorService;
     6 import java.util.concurrent.Executors;
     7 import java.util.concurrent.Future;
     8 import java.util.concurrent.TimeUnit;
     9 
    10 class Fat {
    11     private volatile double d = 0;
    12     private static int counter = 0;
    13     private final int id = counter++;
    14     public Fat() {
    15         for (int i = 0; i < 1000; i++) {
    16             d += (Math.PI + Math.E) / (double) i;
    17         }
    18     }
    19 
    20     public void operation() {
    21         System.out.println(this);
    22     }
    23 
    24     public String toString() {
    25         return "Fat id: " + id;
    26     }
    27 }
    28 class CheckOutTask<T> implements Runnable {
    29     private static int counter = 0;
    30     private final int id = counter++;
    31     private ObjectPool<T> pool;
    32 
    33     public CheckOutTask(ObjectPool<T> pool) {
    34         this.pool = pool;
    35     }
    36 
    37     @Override
    38     public void run() {
    39         try {
    40             T item = pool.checkOut();
    41             System.out.println(this + "checked out " + item);
    42             TimeUnit.SECONDS.sleep(1);
    43             System.out.println(this + "checked in " + item);
    44             pool.checkIn(item);
    45         } catch (InterruptedException e) {
    46         }
    47     }
    48 
    49     public String toString() {
    50         return "CheckoutTask" + id + " ";
    51     }
    52 }
    53 
    54 public class SemaphoreDemo {
    55     final static int SIZE = 10;
    56 
    57     public static void main(String[] args) throws InterruptedException {
    58         final ObjectPool<Fat> pool = new ObjectPool<Fat>(Fat.class, SIZE);
    59         ExecutorService exec = Executors.newCachedThreadPool();
    60         List<Fat> list = new ArrayList<Fat>();
    61         // 创建10个子线程进行签入、签出操作
    62         for (int i = 0; i < SIZE; i++)
    63             exec.execute(new CheckOutTask<Fat>(pool));
    64         System.out.println("All checkout Task created");
    65 
    66         // 主线程只签出
    67         for (int i = 0; i < SIZE; i++) {
    68             Fat f = pool.checkOut();
    69             System.out.println(i + " : main() thread checked out");
    70             f.operation();
    71             list.add(f);
    72         }
    73 
    74         Future<?> blocked = exec.submit(new Runnable() {
    75 
    76             @Override
    77             public void run() {
    78                 try {
    79                     pool.checkOut();
    80                 } catch (InterruptedException e) {
    81                     System.out.println("checkOut() interrupted");
    82                 }
    83             }
    84         });
    85         TimeUnit.SECONDS.sleep(2);
    86         blocked.cancel(true);
    87         System.out.println("Checking in objects in " + list);
    88         for (Fat fat : list)
    89             pool.checkIn(fat);
    90         for (Fat fat : list)
    91             pool.checkIn(fat);
    92         exec.shutdown();
    93     }
    94 }

      代码中创建了一个对象池,每次从池中获取对象时都要先获取信号量,如果信号量计数小于或等于0,则等待。信号量在创建时需要指定资源数,对象池中最开始有10个对象,则信号量的初始值应为10。在使用完对象后应该归还该对象并释放信号量。对于信号量的使用比较简答些。关于对象池代码则可以作为今后编程中示例代码。

    6.Exchanger

      Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据。当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

      1 package com.dy.xidian;
      2 
      3 import java.util.List;
      4 import java.util.concurrent.CopyOnWriteArrayList;
      5 import java.util.concurrent.Exchanger;
      6 import java.util.concurrent.ExecutorService;
      7 import java.util.concurrent.Executors;
      8 import java.util.concurrent.TimeUnit;
      9 
     10 import net.mindview.util.BasicGenerator;
     11 import net.mindview.util.Generator;
     12 
     13 class ExchangerProducer<T> implements Runnable {
     14     private Generator<T> generator;
     15     private Exchanger<List<T>> exchanger;
     16     private List<T> holder;
     17 
     18     /**
     19      * 
     20      * @param exchanger交换器
     21      *            , 用于交换对象
     22      * @param generator产生器
     23      *            , 产生要交换的数据
     24      * @param holder数据容器
     25      *            , 用来存储产生的数据
     26      */
     27     public ExchangerProducer(Exchanger<List<T>> exchanger,
     28             Generator<T> generator, List<T> holder) {
     29         super();
     30         this.generator = generator;
     31         this.exchanger = exchanger;
     32         this.holder = holder;
     33     }
     34 
     35     /**
     36      * 生产者线程会生成一个满的List,用于交换对象
     37      */
     38     @Override
     39     public void run() {
     40         try {
     41             while (!Thread.interrupted()) {
     42                 for (int i = 0; i < ExchangerDemo.size; i++)
     43                     holder.add(generator.next());
     44                 // 返回值是从消费者那里拿到的数据(其实就是一个空表)
     45                 holder = exchanger.exchange(holder);
     46             }
     47         } catch (InterruptedException e) {
     48         }
     49     }
     50 }
     51 
     52 class ExchangerConsumer<T> implements Runnable {
     53     private Exchanger<List<T>> exchanger;
     54     private List<T> holder;
     55     private volatile T value;
     56 
     57     /**
     58      * 
     59      * @param exchanger交换器
     60      *            ,用于交换数据
     61      * @param holder
     62      *            欲交换的对象
     63      */
     64     public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder) {
     65         super();
     66         this.exchanger = exchanger;
     67         this.holder = holder;
     68     }
     69 
     70     /**
     71      * 消费者不断将表中的元素移除,给生产者一个空表
     72      */
     73     @Override
     74     public void run() {
     75         try {
     76             while (!Thread.interrupted()) {
     77                 holder = exchanger.exchange(holder);
     78                 for (T x : holder) {
     79                     value = x;
     80                     holder.remove(x);
     81                 }
     82             }
     83         } catch (InterruptedException e) {
     84         }
     85         System.out.println("Final value: " + value);
     86     }
     87 }
     88 
     89 public class ExchangerDemo {
     90     static int size = 10;
     91     static int delay = 5;
     92 
     93     public static void main(String[] args) throws Exception {
     94         if (args.length > 0)
     95             size = new Integer(args[0]);
     96         if (args.length > 1)
     97             delay = new Integer(args[1]);
     98         ExecutorService exec = Executors.newCachedThreadPool();
     99         Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
    100         List<Fat> producerList = new CopyOnWriteArrayList<Fat>(), consumerList = new CopyOnWriteArrayList<Fat>();
    101         exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator
    102                 .create(Fat.class), producerList));
    103         exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
    104         TimeUnit.SECONDS.sleep(delay);
    105         exec.shutdownNow();
    106     }
    107 }
    View Code

    代码中ExchangeProducer不断填充List,然后将这个满表交换为ExchangerConsumer传递给它的空表。

    代码中用到的写时拷贝技术可以查考下面链接

    http://www.cnblogs.com/dolphin0520/p/3938914.html

  • 相关阅读:
    Scala控制抽象
    【转】ZooKeeper详细介绍和使用第一节
    zookeeper入门系列讲解
    最全面的 Spring 学习笔记
    MySQL 字符串拼接详解
    细说Python2.x与3​​.x版本区别
    【转】微信公众开发URL和token填写详解
    【转】Java代码操作Redis的sentinel和Redis的集群Cluster操作
    Java正则表达式的使用和详解(下)
    Java正则表达式的使用和详解(上)
  • 原文地址:https://www.cnblogs.com/xidongyu/p/5319902.html
Copyright © 2011-2022 走看看