生产者-消费者问题是经典的并发问题, 非常适合并发入门的编程练习。
生产者-消费者问题是指, 有若干个生产者和若干个消费者并发地读写一个或多个共享存储空间;生产者创建对象并放入到共享存储空间,消费者从共享存储空间取出对象进行消费处理。当共享存储空间为满时,生产者被阻塞;当共享存储空间为空时,消费者被阻塞。本文先使用一个自定义的有限长度字符序列缓冲区来作为共享存储空间,并使用原生的 wait 和 notify 机制来实现并发读写; 接着使用 Java 并发库提供的 BlockQueue 来实现同样的目的。
一、 使用自定义的有限长度字符序列缓冲区来作为共享存储空间:
(1) 该缓冲区必须是线程安全的, 比较简单可行的是在方法上加入 synchronized 关键字;
(2) 在并发读写该缓冲区的线程 Producer 和 Comsumer 中,要小心地将 wait 方法放在 while 循环中, 使用 notifyAll 来通知;
(3) 即使缓冲区是线程安全的,要确保操作及其展示结果一致时需要使用同步确保一起执行;
(4) 使用了可见性变量 volatile boolean endflag 来取消线程的执行, 更好的方式是通过线程中断。
/** * PCProblem : * 模拟生产者-消费者问题, 生产者产生字符并写入字符序列缓冲区, 消费者从缓冲区取走字符 * * @author shuqin1984 2011-08-05 * */ package threadprogramming.basic.simulation; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class PCProblem { public static void main(String[] args) { System.out.println(" ---- Thread main starts up ---- "); // 模拟 生产者 - 消费者 任务 SharedCharBuffer sharedBuffer = new SharedCharBuffer(10); ExecutorService es = Executors.newCachedThreadPool(); for (int i=1; i <= 10; i++) { es.execute(new ProducerThread(i, sharedBuffer)); es.execute(new ConsumerThread(i, sharedBuffer)); } es.shutdown(); // 运行 5 秒后终止模拟 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } ProducerThread.cancel(); ConsumerThread.cancel(); es.shutdownNow(); System.out.println("Time to be over."); } }
生产者: Producer.java
/** * ProducerThread: 生产者线程 */ package threadprogramming.basic.simulation; import java.util.Random; import java.util.concurrent.TimeUnit; public class ProducerThread extends Thread { private static String str = "abc1defg2hijk3lmno4pqrs5tuvwx6yz" + "AB7CDEF8GHIJK9LMNO0PQR_STU*VWXYZ"; private static volatile boolean endflag = false; private final int id; private SharedCharBuffer buffer; public ProducerThread(int id, SharedCharBuffer buffer) { this.id = id; this.buffer = buffer; } public static void cancel() { endflag = true; } public boolean isCanceled() { return endflag == true; } /** * 生产者任务: 只要任务不取消,且缓冲区不满,就往缓冲区中字符 */ public void run() { while (!isCanceled() && !Thread.interrupted()) { synchronized (buffer) { while (buffer.isFull()) { // 缓冲区已满,生产者必须等待 try { buffer.wait(); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } char ch = produce(); System.out.println(TimeIndicator.getcurrTime() + ": " + this + " 准备写缓冲区:" + ch); buffer.write(ch); System.out.println(TimeIndicator.getcurrTime() + ": " + this + " : " + buffer); buffer.notifyAll(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } System.out.println("Exit from: " + this); } public char produce() { Random rand = new Random(); return str.charAt(rand.nextInt(64)); } public String toString() { return "P[" + id + "]"; } }
消费者:
/** * ConsumerThread: 消费者线程 * */ package threadprogramming.basic.simulation; import java.util.concurrent.TimeUnit; public class ConsumerThread implements Runnable { private static volatile boolean endflag = false; private final int id; private SharedCharBuffer buffer; public ConsumerThread(int id, SharedCharBuffer buffer) { this.id = id; this.buffer = buffer; } public static void cancel() { endflag = true; } public boolean isCanceled() { return endflag == true; } /** * consume: * 当缓冲区buffer中有字符时,就取出字符显示【相当于消费者】。 * */ public char consume() { return buffer.fetch(); } /** * 消费者任务: 只要任务不取消,且缓冲区不被置空,就从缓冲区中取出字符消费。 */ public void run() { while (!isCanceled() && !Thread.interrupted()) { synchronized (buffer) { while (buffer.isEmpty()) { try { buffer.wait(); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } System.out.println(TimeIndicator.getcurrTime() + ": " + this + " 取出字符: " + consume()); System.out.println(TimeIndicator.getcurrTime() + ": " + this + " : " + buffer); buffer.notifyAll(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } System.out.println("Exit from: " + this); } public String toString() { return "C[" + id + "]"; } }
有限字符缓冲区: SharedCharBuffer.java
/** * CharBuffer: * 实现有限长度字符缓冲区的互斥读写。 * */ package zzz.study.threadprogramming.basic.simulation; public class CharBuffer { private final int capacity; // 指定字符缓冲区能容纳的字符数 private char[] charBuffer; // 用来生产和消费的有限长度字符缓冲区 private int index; private int count; // 该缓冲区被读写的次数,可衡量性能 public CharBuffer(int capacity) { if (charBuffer == null) { charBuffer = new char[capacity]; } this.capacity = capacity; index = 0; } /** * 判断缓冲区是否已满,满则生产者等待 */ public boolean isFull() { return index == capacity; } /** * 判断缓冲区是否为空,空则消费者等待 */ public boolean isEmpty() { return index == 0; } /** * write: 将给定字符写入缓冲区中【改变了缓冲区内容】 * synchronized 关键字用于实现互斥访问缓冲区 * @param ch character that will be written into the buffer. * */ public synchronized void write(char ch) { charBuffer[index] = ch; index++; count++; } /** * read: 读取缓冲区中给定位置的字符【不改变缓冲区内容】 * synchronized 关键字用于实现互斥访问缓冲区 * @param index integer representation of the position * */ public synchronized char read(int index) { return charBuffer[index]; } /** * fetch: 取出缓冲区给定位置的字符【改变了缓冲区内容】 * synchronized 关键字用于实现互斥访问缓冲区 * */ public synchronized char fetch() { index--; count++; return charBuffer[index]; } /** * getStringOfBuffer: 缓冲区内容的字符串表示 * @return string representation of the buffer's contents * */ public synchronized String toString() { if (isEmpty()) { return "缓冲区为空!"; } else { StringBuilder bufferstr = new StringBuilder("缓冲区内容: "); for (int i=0; i < index; i++) { bufferstr.append(charBuffer[i]); } return bufferstr.toString(); } } public int getCount() { return count; } }
二、 使用阻塞队列 BlockQueue 来实现生产者-消费者问题求解
可以看到, 客户端代码简化了不少, 错误风险也降低了。 只要在主线程创建一个 BlockQueue, 传给生产者 ProducerUsingQueue 和 消费者 ConsumerUsingQueue , 然后直接使用 BlockQueue 提供的同步机制。BlockQueue 在内部分别使用了Condition notFull 和 notEmpty 分别来通知 生产者和消费者, 在方法实现中使用了可重入锁 ReentrantLock 来确保并发互斥的操作。
package zzz.study.threadprogramming.basic.simulation.usequeue; import org.apache.commons.logging.Log; import org.apache.log4j.Logger; import zzz.study.threadprogramming.basic.simulation.TimeIndicator; import java.util.Random; import java.util.concurrent.BlockingQueue; public class ProducerUsingQueue extends Thread { private static String str = "abc1defg2hijk3lmno4pqrs5tuvwx6yz" + "AB7CDEF8GHIJK9LMNO0PQR_STU*VWXYZ"; private static volatile boolean endflag = false; private final int id; BlockingQueue<Character> buffer; private Logger log = Logger.getLogger("appInfo"); public ProducerUsingQueue(int id, BlockingQueue<Character> buffer) { this.id = id; this.buffer = buffer; } public static void cancel() { endflag = true; } public boolean isCanceled() { return endflag == true; } public void run() { while (!isCanceled()) { try { char ch = produce(); log.info(TimeIndicator.getcurrTime() + ": " + this + " 准备写缓冲区:" + ch); buffer.put(ch); log.info(TimeIndicator.getcurrTime() + ": " + this + " : " + buffer); } catch (InterruptedException e) { log.error(this + " Interrupted: " + e.getMessage()); } } } public char produce() { Random rand = new Random(); return str.charAt(rand.nextInt(64)); } public String toString() { return "P[" + id + "]"; } }
/** * CharOutputThread: * 通过创建线程,并使用CharBuffer来实现并发地读和写字符缓冲区的仿真 * */ package zzz.study.threadprogramming.basic.simulation.usequeue; import org.apache.log4j.Logger; import zzz.study.threadprogramming.basic.simulation.TimeIndicator; import java.util.concurrent.BlockingQueue; public class ConsumerUsingQueue extends Thread { private static volatile boolean endflag = false; private final int id; private BlockingQueue<Character> buffer; private Logger log = Logger.getLogger("appInfo"); public ConsumerUsingQueue(int id, BlockingQueue<Character> buffer) { this.id = id; this.buffer = buffer; } public static void cancel() { endflag = true; } public boolean isCanceled() { return endflag == true; } /** * consume: * 当缓冲区buffer中有字符时,就取出字符显示【相当于消费者】。 * */ public Character consume() throws InterruptedException { return buffer.take(); } /** * run: * 一个被创建的任务,只要缓冲区不被置空,就从缓冲区中取出字符消费。 */ public void run() { while (!isCanceled()) { try { log.info(TimeIndicator.getcurrTime() + ": " + this + " 取出字符: " + consume()); log.info(TimeIndicator.getcurrTime() + ": " + this + " : " + buffer); } catch (InterruptedException e) { log.error(this + " Interrupted: " + e.getMessage()); } } } public String toString() { return "C[" + id + "]"; } }
/** * TestThread : * * 使用主线程不断从键盘缓冲区获取输入,写入自创建的字符缓冲区,并显示缓冲区内容; * 使用一个子线程不断从自创建的字符缓冲区取出字符输出,并显示缓冲区内容; * */ package zzz.study.threadprogramming.basic.simulation.usequeue; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class ProducerConsumerProblem { public static void main(String[] args) { int num = 10; System.out.println(" ---- Thread main starts up ---- "); BlockingQueue<Character> queue = new ArrayBlockingQueue<Character>(15); ExecutorService es = Executors.newCachedThreadPool(); List<ProducerUsingQueue> producers = new ArrayList<ProducerUsingQueue>(); List<ConsumerUsingQueue> comsumers = new ArrayList<ConsumerUsingQueue>(); for (int i=0; i < num; i++) { producers.add(new ProducerUsingQueue(i, queue)); comsumers.add(new ConsumerUsingQueue(i, queue)); } for (int i=0; i < num; i++) { es.execute(producers.get(i)); es.execute(comsumers.get(i)); } es.shutdown(); try { TimeUnit.SECONDS.sleep(15); } catch (InterruptedException e) { e.printStackTrace(); } ProducerUsingQueue.cancel(); ConsumerUsingQueue.cancel(); es.shutdownNow(); System.out.println("Time to be over."); } }
这里使用了 log4j 来打印日志,相关配置如下:
pom.xml 加入以下依赖:
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
log4j.properties 配置:
log4j.rootLogger = INFO,stdout log4j.logger.appInfo = INFO,appInfoBrefLog log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%c] - [%m]%n log4j.appender.appInfoBrefLog=org.apache.log4j.DailyRollingFileAppender log4j.appender.appInfoBrefLog.File=./app/info_bref.log log4j.appender.appInfoBrefLog.DatePattern ='.'yyyy-MM-dd log4j.appender.appInfoBrefLog.Threshold=INFO log4j.appender.appInfoBrefLog.Append=true log4j.appender.appInfoBrefLog.layout=org.apache.log4j.PatternLayout log4j.appender.appInfoBrefLog.layout.ConversionPattern=%r %m%n