基本概念
生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
阻塞队列代码
就是使用LinkedBlockingQueue,其中的 put() take()方法 是阻塞的。
public class ProdConsum2 { private static final int NUM = 10; private static LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(NUM); public static void main(String[] args) throws Exception { Thread putThread = new Thread(new PutClass()); Thread getThread = new Thread(new GetClass()); putThread.start(); getThread.start(); Thread.sleep(100000); } public static class GetClass implements Runnable { @Override public void run() { while(true) { try { long curTime = queue.take(); System.out.println("取出:" + curTime + " 待取元素:" + queue.size()); Thread.sleep(1000); } catch (Exception e) { } } } } public static class PutClass implements Runnable { @Override public void run() { while(true) { try { long curTime = System.currentTimeMillis(); queue.put(curTime); System.out.println("插入:" + System.currentTimeMillis() + " 剩余空间:" + (NUM - queue.size())); Thread.sleep(500); } catch (Exception e) { } } } } }
1个Lock 2个Condition
相同于一个lock控制生成、消费流程。
package A_newStart.listNode.test; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProdConsum3 { private static final int NUM = 10; private LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(NUM); private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public static void main(String[] args) throws Exception { ProdConsum3 aa = new ProdConsum3(); Thread putThread1 = new Thread(aa.new Producer("produce111")); Thread putThread2 = new Thread(aa.new Producer("produce222")); Thread getThread1 = new Thread(aa.new Consumer("consumer111")); Thread getThread2 = new Thread(aa.new Consumer("consumer222")); putThread1.start(); putThread2.start(); getThread1.start(); getThread2.start(); Thread.sleep(100000); } public class Producer implements Runnable { private String name; public Producer(String name) { this.name = name; } @Override public void run() { while(true) { //获取锁 lock.lock(); try { while (queue.size() >= NUM) { notFull.await(); } long curTime = System.currentTimeMillis(); queue.put(curTime); System.out.println(name + " 插入:" + curTime + " size:" + queue.size()); notEmpty.signal(); } catch (Exception e) { } finally { lock.unlock(); } } } } public class Consumer implements Runnable { private String name; public Consumer(String name) { this.name = name; } @Override public void run() { while(true) { //获取锁 lock.lock(); try { while (queue.size() <= 0) { notEmpty.await(); } long curTime = queue.take(); System.out.println(name + " 获取:" + curTime + " size:" + queue.size()); notFull.signal(); Thread.sleep(100); } catch (Exception e) { } finally { lock.unlock(); } } } } }
2个Lock 2个Condition
与上面1个Lock 2个Condition相比,生成、消费的过程可以并行。
package A_newStart.listNode.test; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProdConsum4 { private static final int NUM = 10; private LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(NUM); private ReentrantLock putLock = new ReentrantLock(); private Condition notFull = putLock.newCondition(); private ReentrantLock getLock = new ReentrantLock(); private Condition notEmpty = getLock.newCondition(); private AtomicInteger count = new AtomicInteger(0);//队列中产品计数器 public static void main(String[] args) throws Exception { ProdConsum4 aa = new ProdConsum4(); Thread putThread1 = new Thread(aa.new Producer("produce111")); Thread putThread2 = new Thread(aa.new Producer("produce222")); Thread getThread1 = new Thread(aa.new Consumer("consumer111")); Thread getThread2 = new Thread(aa.new Consumer("consumer222")); putThread1.start(); putThread2.start(); getThread1.start(); getThread2.start(); Thread.sleep(100000); } public class Producer implements Runnable { private String name; public Producer(String name) { this.name = name; } @Override public void run() { while(true) { int newCount = -1; //获取锁 putLock.lock(); try { while (count.get() >= NUM) { notFull.await(); } long curTime = System.currentTimeMillis(); queue.put(curTime); System.out.println(name + " 插入:" + curTime + " size:" + queue.size()); newCount = count.incrementAndGet(); if (newCount < NUM) { notFull.signalAll(); } } catch (Exception e) { } finally { putLock.unlock(); } if (newCount > 0) { getLock.lock(); try { notEmpty.signalAll(); } catch (Exception e) { } finally { getLock.unlock(); } } } } } public class Consumer implements Runnable { private String name; public Consumer(String name) { this.name = name; } @Override public void run() { while(true) { int newCount = -1; //获取锁 getLock.lock(); try { while (count.get() <= 0) { notEmpty.await(); } long curTime = queue.take(); System.out.println(name + " 获取:" + curTime + " size:" + queue.size()); newCount = count.decrementAndGet(); if (newCount > 0) { notEmpty.signalAll(); } Thread.sleep(100); } catch (Exception e) { } finally { getLock.unlock(); } if (newCount <= 0) { putLock.lock(); try { notFull.signalAll(); } catch (Exception e) { } finally { putLock.unlock(); } } } } } }