zoukankan      html  css  js  c++  java
  • java并发编程构建块

    java5引入了很多新的并发容器和工具,极大的简化了并发程序的编写。本文先介绍Collections.synchronizedXXX工厂方法创建的同步容器的不足,再介绍ConcurrentHashMap,CopyOnWriterArrayList,BlockingQueue,CountDownLatch,Semaphore,CyclicBarrier和显示锁类。

     

    一、引言

    所有的并发问题都来源于如何协调访问并发状态,可变状态越少,并发控制也就越容易容易。没有可变状态的对象永远是现成安全。对有可变状态的对象的并发访问必须进行加锁,对每个方法都加锁并不能保证并发访问一定处于一致的状态。在构建并发程序的时候,可以把并发的控制代理到已有的并发类,比如类库提供的并发容器或工具。

    二、同步容器的不足

    对Collections.synchronizedXXX工厂方法创建的同步容器,每个方法都是使用该容器的内部锁进行控制的,这会带来一性能问题,因为多个安全的读操作也要等待排它锁。即使每个方法的调用都是线程安全的,但同时调用多个操作室,不一定是线程安全的,比如缺少即添加,相等即修改等炒作,是两步原子操作构成的,和在一起并不是原子的,必须在调用代码里使用容器的锁进行控制。另外在迭代集合的时候,还会由于并发修改而抛出ConcurrentModefiedExceptioin,这往往是调用程序不希望的结果。

    三、ConcurrentHashMap,CopyOnWriterArrayList

    ConcurrentHashMap使用的内部细粒度的分离锁,这个锁机制允许任意数量的读线程并发访问,提高了吞吐率。在迭代时不会抛出ConcurrentModefiedExceptioin,如果在迭代期间有修改发生,返回的是迭代开始时的状态。另外对缺少即添加,相等即修改等二元操作也有相应的方法支持。ConcurrentHashMap实现了ConcurrentMap提供的几个特殊原子操作:

    public V putIfAbsent(K key,  V value)

    如果指定键已经不再与某个值相关联,则将它与给定值关联。

    public boolean remove(Object key, Object value)

    只有目前将键的条目映射到给定值时,才移除该键的条目。

    public boolean replace(K key, V oldValue, V newValue)

    只有目前将键的条目映射到给定值时,才替换该键的条目。

    public V replace(K key, V value)

    只有目前将键的条目映射到某一值时,才替换该键的条目。

     

    CopyOnWriterArrayList 是ArrayList的一个并发替代品,通常情况下,它提供比较好的并发性,允许多个现在并发的对其进行迭代。每次需要修改时,便会创建并重新发布一个新的容器拷贝,以此来实现可变性。因为底层使用数组实现,如果数组元素较多是,复制多付出的代价较大。

    三、BlockingQueue

    阻塞队列提供了可以阻塞的put和take方法,以及与之等价的可以指定超时的offer和poll。如果Queue是空的,那么take方法会一直阻塞,直到有元素可用。如果Queue是有线长度的,队列满的时候put方法也会阻塞。BlockingQueue可以很好的支持生产者和消费者模式,生产者往队列里put,消费者从队列里get,两者能够得好很好的同步。BlockingQueue的实现类LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,PriorityBlockingQueue是一个按优先级排序的队列。使用BlockingQueue构建的一个生产者与消费例子:

    消费者:

    Java代码  收藏代码
    1. public class Consumer implements Runnable {  
    2.   
    3.     private BlockingQueue<Food> queue;  
    4.     private ExecutorService exec;  
    5.   
    6.     public Consumer(BlockingQueue<Food> queue, ExecutorService exec) {  
    7.         this.queue = queue;  
    8.         this.exec = exec;  
    9.     }  
    10.   
    11.     @Override  
    12.     public void run() {  
    13.         while (!exec.isShutdown()) {  
    14.             try {  
    15.                 Thread.sleep(2000);  
    16.                 Food food = queue.take();  
    17.                 System.out.println("Consumer " + food);  
    18.             } catch (InterruptedException e) {  
    19.                 e.printStackTrace();  
    20.             } catch (RejectedExecutionException e) {  
    21.   
    22.             }  
    23.         }  
    24.     }  
    25. }  

     生产者:

    Java代码  收藏代码
    1. public class Producer implements Runnable {  
    2.   
    3.     private BlockingQueue<Food> queue;  
    4.     private ExecutorService exec;  
    5.   
    6.     public Producer(BlockingQueue<Food> queue, ExecutorService exec) {  
    7.         this.queue = queue;  
    8.         this.exec = exec;  
    9.     }  
    10.   
    11.     @Override  
    12.     public void run() {  
    13.         while (!exec.isShutdown()) {  
    14.             Food food = new Food();  
    15.             try {  
    16.                 Thread.sleep(4000);  
    17.                 queue.put(food);  
    18.                 System.out.println("Produce " + food);  
    19.             } catch (InterruptedException e) {  
    20.                 e.printStackTrace();  
    21.             } catch (RejectedExecutionException e) {  
    22.   
    23.             }  
    24.         }  
    25.     }  
    26. }  

     Main:

    Java代码  收藏代码
    1. BlockingQueue<Food> queue = new ArrayBlockingQueue<Food>(5);  
    2. ExecutorService exec = Executors.newFixedThreadPool(3);  
    3. Producer p1 = new Producer(queue, exec);  
    4. Producer p2 = new Producer(queue, exec);  
    5.   
    6. Consumer c1 = new Consumer(queue, exec);  
    7.   
    8. exec.execute(p1);  
    9. exec.execute(p2);  
    10. exec.execute(c1);  
    11. try {  
    12.     Thread.sleep(10000);  
    13. catch (InterruptedException ignored) {  
    14. }  
    15. exec.shutdown();  

     四、CountDownLatch

    闭锁(Latch),它可以延迟线程的进度知道线程到达终止状态。一个闭锁工作方式就像一道门,直到闭锁到达终点状态之前,门一直关闭着。终点状态到了之后,所有阻塞的线程都可以通过。CountDownLatch 使用一个计数器作为终点状态,知道计数器的值到达0时,闭锁才会打开。调用await 方法,线程会阻塞知道计数器为0,countDown 方法使计数器减一。

    闭锁有两种常见的用法,开始闭锁,结束闭锁。开始闭锁用于等待一个条件到达后所有线程一起执行,结束闭锁可以用来等待所有条件或所有线程结束后再进行后续处理。例子:

    Java代码  收藏代码
    1. final CountDownLatch startLatch = new CountDownLatch(1);  
    2. final CountDownLatch endLatch = new CountDownLatch(3);  
    3. Runnable prepare = new Runnable() {  
    4.     @Override  
    5.     public void run() {  
    6.         try {  
    7.             startLatch.await();//等待开始闭锁,线程同时开始执行  
    8.             System.out.println("收拾东西,准备出门");  
    9.             Random rnd = new Random();  
    10.             Thread.sleep(rnd.nextInt(1000));  
    11.         } catch (InterruptedException ignored) {  
    12.         }  
    13.         endLatch.countDown();  
    14.     }  
    15. };  
    16.   
    17. Thread mum = new Thread(prepare);  
    18. Thread dad = new Thread(prepare);  
    19. Thread me = new Thread(prepare);  
    20. mum.start();  
    21. dad.start();  
    22. me.start();  
    23. startLatch.countDown();  
    24. try {  
    25.     endLatch.await();  
    26. catch (InterruptedException ignored) {  
    27. }  
    28. System.out.println("逛街");  

     五、Semaphore,信号量

    使用信号量进行同步和互斥的控制是最经典的并发模型,java中也提高支持。一个Semaphore管理一个有效的许可 集,许可基的数量通过构造函数传入,通过acquire方法申请一个许可,许可数为0则阻塞线程,否则许可数减一,使用release方法释放一个许个,许可数加一。一个技术量为1的Semaphore为二元信号量,相当于一个互斥锁,表示不可重入的锁。一个使用信号量控制并发容器上届的例子:

    Java代码  收藏代码
    1. public class BoundedHashSet<T> {  
    2.     private final Set<T> set;  
    3.     private final Semaphore sem;  
    4.   
    5.     public BoundedHashSet(int bound) {  
    6.         set = Collections.synchronizedSet(new HashSet<T>());  
    7.         sem = new Semaphore(bound);  
    8.     }  
    9.   
    10.     public boolean add(T o) throws InterruptedException {  
    11.         sem.acquire();  
    12.         boolean wasAdded = false;  
    13.         try {  
    14.             wasAdded = set.add(o);  
    15.             return wasAdded;  
    16.         } finally {  
    17.             if (!wasAdded)  
    18.                 sem.release();  
    19.         }  
    20.     }  
    21.   
    22.     public boolean remove(Object o) {  
    23.         boolean wasRemoved = set.remove(o);  
    24.         if (wasRemoved)  
    25.             sem.release();  
    26.         return wasRemoved;  
    27.     }  
    28. }  

     六、CyclicBarrier

    关卡(Barrier)类似于闭锁,他们都能阻塞一组线程,知道某些事件发生,不同之处在于所有CyclicBarrier等待的是现线程,只有一定数目的线程到达这个点时,才允许同时通过。它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

    Java代码  收藏代码
    1. public class Main {  
    2.   
    3.     public static CyclicBarrier getCyclicBarrier(int count) {  
    4.         if (count <= 0)  
    5.             return null;  
    6.         final CyclicBarrier cyclicBarrier = new CyclicBarrier(count,  
    7.                 new Runnable() {  
    8.                     public void run() {  
    9.                         try {  
    10.                             Thread.sleep(1000);  
    11.                         } catch (InterruptedException e) {  
    12.                             e.printStackTrace();  
    13.                         }  
    14.                         System.out.println("conditon is arrive and CycleBarrier is running");  
    15.                     }  
    16.                 });  
    17.         return cyclicBarrier;  
    18.     }  
    19.   
    20.     public static Thread getThread(String nameOfThread,  
    21.             final CyclicBarrier cyclicBarrier) {  
    22.         Thread thread = new Thread(nameOfThread) {  
    23.             public void run() {  
    24.                 System.out.println(this.getName() +  
    25. "is begin; and count is "+ (++count));  
    26.                 try {  
    27.                     cyclicBarrier.await();  
    28.                 } catch (InterruptedException e) {  
    29.                     e.printStackTrace();  
    30.                 } catch (BrokenBarrierException e) {  
    31.                     e.printStackTrace();  
    32.                 }  
    33.                 System.out.println(this.getName() + "finished");  
    34.             }  
    35.         };  
    36.         return thread;  
    37.   
    38.     }  
    39.   
    40.     static int count = 0;  
    41.   
    42.     public static void main(String[] args) {  
    43.         /** define a cyclicBarrier and number of barrier is 2. */  
    44.         CyclicBarrier cyclicBarrier = getCyclicBarrier(2);  
    45.         Thread threadOne = getThread("threadOne", cyclicBarrier);  
    46.         threadOne.start();  
    47.         Thread threadTwo = getThread("threadTwo", cyclicBarrier);  
    48.         threadTwo.start();  
    49.     }  
    50. }  

     该例子中CyclicBarrier等待两个线程到达后输出conditon is arrive and CycleBarrier is running,两个线程都从await中返回。

    七、显式锁

     

    在java 5之前,用于调节共享对象访问的机制只有synchronized和volatile。java 5提供了新的选择:ReentrantLock。ReentrantLock能够提供更多的高级特性,比如轮询和可定时的加锁,可中断的加锁。以及一个支持读锁和写锁的ReentrantReadWriteLock。使用ReentrantLock必须手动使用lock或其他操作加锁,在finally块中unlock。

    ReentrantLock:一个可重入的互斥锁Lock,它具有与使用synchronized方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。 使用ReentrantLock构建的同步Map:

    Java代码  收藏代码
    1. public class LockedMap<K, V> {  
    2.     private Map<K, V> map;  
    3.     private Lock lock = new ReentrantLock();  
    4.       
    5.     public LockedMap(Map<K, V> map) {  
    6.         this.map = map;  
    7.     }  
    8.   
    9.     public V get(K key) {  
    10.         lock.lock();  
    11.         try {  
    12.             return map.get(key);  
    13.         } finally {  
    14.             lock.unlock();  
    15.         }  
    16.     }  
    17.   
    18.     public void put(K key, V value) {  
    19.         lock.lock();  
    20.         try {  
    21.             map.put(key, value);  
    22.         } finally {  
    23.             lock.unlock();  
    24.         }  
    25.     }  
    26. }  
    Java代码  收藏代码
    1. public class ReentrantLockTest {  
    2.   
    3.     private List<Integer> numbers = new ArrayList<Integer>();  
    4.     private Lock numbersLock = new ReentrantLock();  
    5.   
    6.     public void addNumbers(int num) {  
    7.         try {  
    8.             numbersLock.lock();  
    9.             numbers.add(num);  
    10.         } finally {  
    11.             numbersLock.unlock();  
    12.         }  
    13.     }  
    14.   
    15.     public void outputNumbers() {  
    16.         try {  
    17.             if (numbersLock.tryLock(1, TimeUnit.SECONDS)) {  
    18.                 for (int num : numbers) {  
    19.                     System.out.println(num);  
    20.                 }  
    21.             }  
    22.         } catch (InterruptedException ex) {  
    23.             ex.printStackTrace();  
    24.         } finally {  
    25.             numbersLock.unlock();  
    26.         }  
    27.     }      
    28.   
    29.     public static void main(String[] args) {  
    30.         final ReentrantLockTest test = new ReentrantLockTest();  
    31.         Executor pool = Executors.newFixedThreadPool(3);  
    32.         pool.execute(new Runnable() {  
    33.   
    34.             public void run() {  
    35.                 Random rnd = new Random();  
    36.                 while (true) {  
    37.                     int number = rnd.nextInt();  
    38.                     test.addNumbers(number);  
    39.                     try {  
    40.                         Thread.sleep(100);  
    41.                     } catch (InterruptedException ignored) {  
    42.                     }  
    43.                 }  
    44.             }  
    45.         });  
    46.   
    47.         pool.execute(new Runnable() {  
    48.   
    49.             public void run() {  
    50.                 while (true) {  
    51.                     test.outputNumbers();  
    52.                     try {  
    53.                         Thread.sleep(1000);  
    54.                     } catch (InterruptedException ignored) {  
    55.                     }  
    56.                 }  
    57.             }  
    58.         });  
    59.     }  
    60.   
    61.   
    62. }  
     

    ReentrantReadWriteLock提供了对读锁和写锁的支持,同一时刻,可允许多个读锁,但只允许有一个写锁,读锁的获取和写锁的获取是互斥的。从ReentrantReadWriteLock对象的readLock方法可以获得相应的读锁,writeLock方法可以获得相应的写锁。使用ReentrantReadWriteLock构建的Map,允许多个get操作并发执行:

    Java代码  收藏代码
    1. public class ReadWriteMap<K,V>  {  
    2.     private Map<K,V> map;  
    3.     private ReadWriteLock lock = new ReentrantReadWriteLock();  
    4.     private Lock readLock = lock.readLock();  
    5.     private Lock writeLock  = lock.writeLock();   
    6.       
    7.     public ReadWriteMap(Map<K,V> map){  
    8.         this.map = map;  
    9.     }  
    10.       
    11.     public V get(K key){  
    12.         readLock.lock();  
    13.         try{  
    14.             return map.get(key);  
    15.         }  
    16.         finally{  
    17.             readLock.unlock();  
    18.         }     
    19.     }  
    20.       
    21.     public void put(K key,V value){  
    22.         writeLock.lock();  
    23.         try{  
    24.             map.put(key, value);  
    25.         }  
    26.         finally{  
    27.             writeLock.unlock();  
    28.         }  
    29.     }  
    30.       
    31. }  
     

    所有代码见附件。本文参考《Java并发编程实践 》。

  • 相关阅读:
    3变量
    2python的第一个程序
    1python语言的基本介绍
    6文字的剪切,复制,移动
    讨厌的Permission denied:adb访问手机目录时,怎么处理Permission denied问题
    adb命令整理(持续整理)
    loadrunner12下载、安装、认证、汉化
    操作系统中堆和栈的区别
    mos管选型
    Office 转 PDF
  • 原文地址:https://www.cnblogs.com/chenying99/p/2743756.html
Copyright © 2011-2022 走看看