1 package com.lilei.pack09; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 6 public class MySyncQueue<T> { 7 8 private Object[] ts; 9 10 int pos = -1; 11 12 public MySyncQueue(int size) { 13 ts = new Object[size]; 14 } 15 16 public synchronized void push(T t) throws InterruptedException { 17 while (true) { 18 if (pos + 1 < ts.length) { 19 ts[++pos] = t; 20 notifyAll(); 21 System.out.println(Thread.currentThread().getName() + " push,currentSize=" + (pos + 1)); 22 return; 23 } else { 24 wait(); 25 } 26 } 27 } 28 29 public synchronized T pop() throws InterruptedException { 30 31 while (true) { 32 if (pos >= 0) { 33 @SuppressWarnings("unchecked") 34 T t = (T) ts[pos--]; 35 36 notifyAll(); 37 38 System.out.println(Thread.currentThread().getName() + " pop,currentSize=" + (pos + 1)); 39 40 return t; 41 } else { 42 wait(); 43 } 44 } 45 46 } 47 48 public static class Inner { 49 50 } 51 52 public static void main(String[] args) { 53 ExecutorService es = Executors.newFixedThreadPool(30); 54 55 final MySyncQueue<Inner> queue = new MySyncQueue<Inner>(15); 56 57 int repeat = 1000; 58 59 while (repeat-->0) { 60 61 for (int i = 0; i < 15; i++) { 62 es.execute(new Runnable() { 63 public void run() { 64 65 try { 66 queue.pop(); 67 } catch (InterruptedException e) { 68 e.printStackTrace(); 69 } 70 } 71 }); 72 } 73 74 for (int i = 0; i < 15; i++) { 75 es.execute(new Runnable() { 76 public void run() { 77 78 try { 79 queue.push(new Inner()); 80 } catch (InterruptedException e) { 81 e.printStackTrace(); 82 } 83 } 84 }); 85 } 86 87 } 88 89 es.shutdown(); 90 } 91 92 }