实际开发中,我们经常会接触到生产消费者模型,如:Android的Looper相应handler处理UI操作,Socket通信的响应过程、数据缓冲区在文件读写应用等。强大的模型框架,鉴于本人水平有限目前水平只能膜拜,本次只能算学习笔记,为了巩固自己对Java多线程常规知识点的理解,路过大神还望能指导指导。下面一段代码是最常规的生产者消费者的例子:
package com.zhanglei.demo; import java.util.ArrayList; import java.util.List; import java.util.Random; public class ResourceBuffer { private final int DEFAULT_BUFFER_SIZE = 100; private int size; private Random rnd; private List<Integer> bufferList = new ArrayList<Integer>(); public ResourceBuffer(int size){ rnd = new Random(); if(size >0) this.size = size; else this.size = DEFAULT_BUFFER_SIZE; } public synchronized void product(){ if(bufferList.size() == size){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int num = rnd.nextInt(100); bufferList.add(num); System.out.println("生产商品编号"+num); notifyAll(); } public synchronized void consumer(){ if(bufferList.size() == 0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int index = bufferList.size() -1; System.out.println("消费商品编号"+bufferList.get(index)); bufferList.remove(index); notifyAll(); } }
package com.zhanglei.demo; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Program { /** * @param args */ public static void main(String[] args) { ResourceBuffer buffer = new ResourceBuffer(10); ExecutorService executor = Executors.newCachedThreadPool(); executor.execute(new ProductTask(buffer)); executor.execute(new ConsumerTask(buffer)); } } class ConsumerTask implements Runnable{ private ResourceBuffer buffer; public ConsumerTask(ResourceBuffer buffer){ this.buffer = buffer; } @Override public void run() { while(true){ buffer.consumer(); } } } class ProductTask implements Runnable{ private ResourceBuffer buffer; public ProductTask(ResourceBuffer buffer){ this.buffer = buffer; } @Override public void run() { while(true){ buffer.product(); } } }
以上代码通过实现对ResourceBuffer类的对象生产和消费来实现同步和协作,实际上就是对资源互斥访问实现同步。我们同样可以用java.util.concurrent包下的Lock接口实现同样的效果,代码如下:
package com.zhanglei.demo; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ResourceBuffer { private final int DEFAULT_BUFFER_SIZE = 100; private int size; private Random rnd; private List<Integer> bufferList = new ArrayList<Integer>(); private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition();//不为空条件 private Condition notFill = lock.newCondition();//不为满条件 public ResourceBuffer(int size){ rnd = new Random(); if(size >0) this.size = size; else this.size = DEFAULT_BUFFER_SIZE; } public void product(){ lock.lock(); try{ if(bufferList.size() == size){ try { notFill.await(); } catch (InterruptedException e) { e.printStackTrace(); } } int num = rnd.nextInt(100); bufferList.add(num); System.out.println("生产商品编号"+num); notEmpty.signalAll(); } finally{ lock.unlock(); } } public void consumer(){ lock.lock(); try{ if(bufferList.size() == 0){ try { notEmpty.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } int index = bufferList.size() -1; System.out.println("消费商品编号"+bufferList.get(index)); bufferList.remove(index); notFill.signalAll(); } finally{ lock.unlock(); } } }
通过以上代码实现的对生产者和消费者模式的同步,也只是实现对资源互斥访问实现同步,这种同步方式的并发并不高。如果说这种方式的生产者和消费者模式有什么优势的话,我个人觉得唯一的优势,即使发生了异常,也能保证锁一定能被释放。这种方式只是解决了同步问题,还有并发还有提高的空间。我们通过同步方法,我们本来目的只是为了保证生产和消费互斥操作,但是我们本来可以多个生产者一起生产的情况也被禁止了,这样让我们的并发度降低不少。
由此,我们可以改进我们的生产者和消费者模式,下面我们通过引入读写锁来解决不能多个生产者同时生产或者多个消费者同时消费的问题。改进后的代码如下:
package com.zhanglei.demo; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ResourceBuffer { private final int DEFAULT_BUFFER_SIZE = 100; private int size; private Random rnd; private List<Integer> bufferList = new ArrayList<Integer>(); private ReadWriteLock rwLock = new ReentrantReadWriteLock(); public ResourceBuffer(int size){ rnd = new Random(); if(size >0) this.size = size; else this.size = DEFAULT_BUFFER_SIZE; } public void product(){ rwLock.writeLock().lock(); try{ if(bufferList.size() == size){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } int num = rnd.nextInt(100); bufferList.add(num); System.out.println("生产商品编号"+num); } finally{ rwLock.writeLock().unlock(); } } public void consumer(){ rwLock.readLock().lock(); try{ if(bufferList.size() == 0){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } int index = bufferList.size() -1; System.out.println("消费商品编号"+bufferList.get(index)); bufferList.remove(index); } finally{ rwLock.readLock().unlock(); } } }
本着不重复造轮子的原则,生产者和消费者模式中的缓存区,在我们java类库已经做了相当好的封装,我们下面引入java.util.concurrent下的ArrayBlockingQueue来实现我们的生产者和消费者模式的代码如下:
package com.zhanglei.demo; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; public class ResourceBuffer { private final int DEFAULT_BUFFER_SIZE = 100; private int size; private Random rnd; private ArrayBlockingQueue<Integer> arrayQueue; public ResourceBuffer(int size){ if(size >0) this.size = size; else this.size = DEFAULT_BUFFER_SIZE; rnd = new Random(); arrayQueue = new ArrayBlockingQueue<Integer>(size); //此处指定数组的队列的初始容量大小 } public void product() { int num = rnd.nextInt(100); System.out.println("生产商品编号"+num); try { arrayQueue.put(num); } catch (InterruptedException e) { e.printStackTrace(); } } public void consumer(){ int num; try { num = arrayQueue.take(); System.out.println("消费商品编号"+num); } catch (InterruptedException e) { e.printStackTrace(); } } }
我们通过查看put和take方法的源码,我们知道ArrayBlockingQueue已经实现我们以上可阻塞的队列。关于offer和poll的源码如下:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
通过ArrayBlockingQueue源码,我们看到ArrayBlockingQueue的读/取的方法跟以上生产者和消费者方法实现基本一致。