生产消费者模式
貌似也是阻塞的问题
花了一些时间终于弄明白这个鸟东东,以前还以为是不复杂的一个东西的,以前一直以为和观察者模式差不多(其实也是差不多的,呵呵),生产消费者模式应该是可以通过观察者模式来实现的,对于在什么环境下使用现在想的还不是特别清楚,主要是在实际中还没使用过这个。
需要使用到同步,以及线程,属于多并发行列,和观察者模式的差异也就在于此吧,所以实现起来也主要在这里的差异。
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据
◇解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
◇支持并发(concurrency)
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。
◇支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
用了两种方式实现了一下这个模式,主要参考了网上的一些例子才弄明白,这里对队列的实现有很多种方法,需要和具体的应用相结合吧,队列缓冲区很简单,现在已有大量的实现,缺点是在性能上面(内存分配的开销和同步/互斥的开销),下面的实现都是这种方式;环形缓冲区(减少了内存分配的开销),双缓冲区(减少了同步/互斥的开销)。
第一个例子是使用的信号量的东东,没有执行具体的东西,只是实现了这个例子,要做复杂的业务逻辑的话需要自己在某些方法内去具体实现
代码如下:
消费者:
public class TestConsumer implements Runnable { TestQueue obj; public TestConsumer(TestQueue tq){ this.obj=tq; } public void run() { try { for(int i=0;i<10;i++){ obj.consumer(); } } catch (Exception e) { e.printStackTrace(); } } }
生产者:
public class TestProduct implements Runnable { TestQueue obj; public TestProduct(TestQueue tq){ this.obj=tq; } public void run() { for(int i=0;i<10;i++){ try { obj.product("test"+i); } catch (Exception e) { e.printStackTrace(); } } } }
队列(使用了信号量,采用synchronized进行同步,采用lock进行同步会出错,或许是还不知道实现的方法):
public static Object signal=new Object(); boolean bFull=false; private List thingsList=new ArrayList(); private final ReentrantLock lock = new ReentrantLock(true); BlockingQueue q = new ArrayBlockingQueue(10); /** * 生产 * @param thing * @throws Exception */ public void product(String thing) throws Exception{ synchronized(signal){ if(!bFull){ bFull=true; //产生一些东西,放到 thingsList 共享资源中 System.out.println("product"); System.out.println("仓库已满,正等待消费..."); thingsList.add(thing); signal.notify(); //然后通知消费者 } signal.wait(); // 然后自己进入signal待召队列 } } /** * 消费 * @return * @throws Exception */ public String consumer()throws Exception{ synchronized(signal){ if(!bFull) { signal.wait(); // 进入signal待召队列,等待生产者的通知 } bFull=false; // 读取buf 共享资源里面的东西 System.out.println("consume"); System.out.println("仓库已空,正等待生产..."); signal.notify(); // 然后通知生产者 } String result=""; if(thingsList.size()>0){ result=thingsList.get(thingsList.size()-1).toString(); thingsList.remove(thingsList.size()-1); } return result; }
测试代码:
public class TestMain { public static void main(String[] args) throws Exception{ TestQueue tq=new TestQueue(); TestProduct tp=new TestProduct(tq); TestConsumer tc=new TestConsumer(tq); Thread t1=new Thread(tp); Thread t2=new Thread(tc); t1.start(); t2.start(); } }
运行结果:
这是jdk里面的例子
* class Producer implements Runnable { * private final BlockingQueue queue; * Producer(BlockingQueue q) { queue = q; } * public void run() { * try { * while(true) { queue.put(produce()); } * } catch (InterruptedException ex) { ... handle ...} * } * Object produce() { ... } * } * * class Consumer implements Runnable { * private final BlockingQueue queue; * Consumer(BlockingQueue q) { queue = q; } * public void run() { * try { * while(true) { consume(queue.take()); } * } catch (InterruptedException ex) { ... handle ...} * } * void consume(Object x) { ... } * } * * class Setup { * void main() { * BlockingQueue q = new SomeQueueImplementation(); * Producer p = new Producer(q); * Consumer c1 = new Consumer(q); * Consumer c2 = new Consumer(q); * new Thread(p).start(); * new Thread(c1).start(); * new Thread(c2).start(); * } * }
jdk1.5以上的一个实现,使用了Lock以及条件变量等东西
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
参考1:生产者/消费者模式(阻塞队列)
参考2:生产者/消费者模式(阻塞队列)