生产者消费者问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。这个案例中主要实现的是两个角色协同对同一资源进行访问。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
package com.gdufe.thread.consumer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerProducer { private static Buffer buffer = new Buffer(); public static void main(String[] args) { // Create a thread pool with two threads ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new ProducerTask()); executor.execute(new ConsumerTask()); executor.shutdown(); } // A task for adding an int to the buffer private static class ProducerTask implements Runnable { public void run() { try { int i = 1; while (true) { System.out.println("Producer writes " + i); buffer.write(i++); // Add a value to the buffer // Put the thread into sleep Thread.sleep((int)(Math.random() * 10000)); } } catch (InterruptedException ex) { ex.printStackTrace(); } } } // A task for reading and deleting an int from the buffer private static class ConsumerTask implements Runnable { public void run() { try { while (true) { System.out.println(" Consumer reads " + buffer.read()); // Put the thread into sleep Thread.sleep((int)(Math.random() * 10000)); } } catch (InterruptedException ex) { ex.printStackTrace(); } } } // An inner class for buffer private static class Buffer { private static final int CAPACITY = 1; // buffer size private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>(); //queue for storing data // Create a new lock private static Lock lock = new ReentrantLock(); // Create two conditions private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); public void write(int value) { lock.lock(); // Acquire the lock try { while (queue.size() == CAPACITY) { System.out.println("Wait for notFull condition"); notFull.await(); } queue.offer(value); notEmpty.signal(); // Signal notEmpty condition } catch (InterruptedException ex) { ex.printStackTrace(); } finally { lock.unlock(); // Release the lock } } public int read() { int value = 0; lock.lock(); // Acquire the lock try { while (queue.isEmpty()) { System.out.println(" Wait for notEmpty condition"); notEmpty.await(); } value = queue.remove(); notFull.signal(); // Signal notFull condition } catch (InterruptedException ex) { ex.printStackTrace(); } finally { lock.unlock(); // Release the lock return value; } } } }
1 package com.gdufe.thread.consumer; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 public class ConsumerProducerUsingBlockingQueue { 8 private static ArrayBlockingQueue<Integer> buffer = 9 new ArrayBlockingQueue<Integer>(2); 10 11 public static void main(String[] args) { 12 // Create a thread pool with two threads 13 ExecutorService executor = Executors.newFixedThreadPool(2); 14 executor.execute(new ProducerTask()); 15 executor.execute(new ConsumerTask()); 16 executor.shutdown(); 17 } 18 19 // A task for adding an int to the buffer 20 private static class ProducerTask implements Runnable { 21 public void run() { 22 try { 23 int i = 1; 24 while (true) { 25 System.out.println("Producer writes " + i); 26 buffer.put(i++); // Add any value to the buffer, say, 1 27 // Put the thread into sleep 28 Thread.sleep((int)(Math.random() * 10000)); 29 } 30 } catch (InterruptedException ex) { 31 ex.printStackTrace(); 32 } 33 } 34 } 35 36 // A task for reading and deleting an int from the buffer 37 private static class ConsumerTask implements Runnable { 38 public void run() { 39 try { 40 while (true) { 41 System.out.println(" Consumer reads " + buffer.take()); 42 // Put the thread into sleep 43 Thread.sleep((int)(Math.random() * 10000)); 44 } 45 } catch (InterruptedException ex) { 46 ex.printStackTrace(); 47 } 48 } 49 } 50 }