演示线程的协调:
假设使用缓冲区存储整数。缓冲区的大小是受限的。缓冲区提供write(int)方法将一个int值添加到缓冲区中去,还提供方法read() 从缓冲区充读取和删除一个int 值。
为了同步这个操作,使用具有两个条件的锁:notEmpty(即缓冲区非空) 和 notFull(即缓冲区未满)。
当任务向缓冲区中添加一个int 时,如果缓冲区是满的,那么任务将会等待notFull条件。
当任务从缓冲区中读取一个int 时,如果缓冲区是空的,那么任务将等待notEmpty条件。
程序包括了Buffer类以及重复向缓冲区产生数字和重复从缓冲区消耗数字的两个任务。
package newThread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerProducer { private static Buffer buffer = new Buffer(); public static void main(String[] args) { // TODO Auto-generated method stub //Create a thread pool with two thread ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(new ProducerTask()); executorService.execute(new ConsumerTask()); executorService.shutdown(); } //A task for adding an int to the buffer private static class ProducerTask implements Runnable{ @Override public void run() { // TODO Auto-generated method stub try { int i = 1; while(true){ System.out.println("ProducerTask wirtes " + i); buffer.write(i++);//Add a value to the buffer //Put the thread into sleep Thread.sleep((int) (Math.random()*1000)); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //A task for reading and deleting an int from the buffer private static class ConsumerTask implements Runnable{ @Override public void run() { try { while(true){ System.out.println(" Consumer reads " + buffer.read()); //Put the thread into sleep Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //An inner class for buffer private static class Buffer{ private static final int CAPACITY = 1;//buffer size private java.util.LinkedList<Integer> queue = new java.util.LinkedList<>(); //create a new lock private static Lock lock = new ReentrantLock(); //Create two conditions private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); public void write(int value){ lock.lock();//Acquire the lock try { while(queue.size() == CAPACITY){ System.out.println("Wait for notFull condition"); notFull.await(); } queue.offer(value); notEmpty.signal();//Signal notEmpty conditon } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { lock.unlock();//Release the lock } } public int read(){ int value = 0; lock.lock();//Acquire the lock try { while(queue.isEmpty()){ System.out.println(" Wait for not Empty condition"); notEmpty.await(); } value = queue.remove(); notFull.signalAll();//Signal notFull condition } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return value; } } }
运行结果如下: