MessageQueue 和blockQueue起到的作用是一样的
public class Message { private String data; public Message(String data) { this.data = data; } public String getData() { return data; } }
public class MessageQueue { private final LinkedList<Message> queue; private final int limit; public MessageQueue() { limit = 10; this.queue = new LinkedList<>(); } public void put(final Message message) throws InterruptedException { synchronized (queue) { while (queue.size() > limit) { queue.wait(); } queue.addLast(message); queue.notifyAll(); } } public Message take() throws InterruptedException { synchronized (queue) { while (queue.isEmpty()) { queue.wait(); } Message message = queue.removeFirst(); queue.notifyAll(); return message; } } public int getMaxLimit() { return this.limit; } public int getMessageSize() { synchronized (queue) { return queue.size(); } } }
生产者
public class ProducerThread extends Thread { private final MessageQueue messageQueue; private final static Random random = new Random(System.currentTimeMillis()); private final static AtomicInteger counter = new AtomicInteger(0); public ProducerThread(MessageQueue messageQueue, String seq) { super("【生产者" + seq+"】"); this.messageQueue = messageQueue; } @Override public void run() { while (true) { try { Message message = new Message("Message-" + counter.getAndIncrement()); messageQueue.put(message); System.out.println(Thread.currentThread().getName() + " 生产消息 " + message.getData()); Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { break; } } } }
消费者
public class ConsumerThread extends Thread { private final MessageQueue messageQueue; private final static Random random = new Random(System.currentTimeMillis()); public ConsumerThread(MessageQueue messageQueue, String seq) { super("【消费者" + seq+"】"); this.messageQueue = messageQueue; } @Override public void run() { while (true) { try { Message message = messageQueue.take(); System.out.println(Thread.currentThread().getName() + " 消费消息 " + message.getData()); Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { break; } } } }
测试
public class ProducerAndConsumerClient { public static void main(String[] args) { final MessageQueue messageQueue = new MessageQueue(); new ProducerThread(messageQueue, "一").start(); new ProducerThread(messageQueue, "二").start(); new ProducerThread(messageQueue, "三").start(); new ConsumerThread(messageQueue, "一").start(); new ConsumerThread(messageQueue, "二").start(); } }