package Method; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Created by joyce on 2019/10/25. */ public class MyBlockingQueue<T> { private volatile MyLink<T> myLink = new MyLink<T>(); private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private volatile int allow = 5; private volatile int length = 0; public static void main(String [] f) { MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(); for(int i=0; i<10; ++i) { final int j= i; new Thread(new Runnable() { @Override public void run() { myBlockingQueue.push(j); } }).start(); }; try { System.out.println("sleep"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new Runnable() { @Override public void run() { while (true) { Integer t = myBlockingQueue.pop(); System.out.println("pop " + t); } } }).start(); } public void push(T t) { try { lock.lock(); // if(length == allow) while (length == allow) { condition.await(); } System.out.println("push " + t); myLink.add(t); ++length; condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public T pop() { try { lock.lock(); // if(length == 0) while (length == 0) { condition.await(); } T t = myLink.get(0); myLink.remove(0); --length; condition.signalAll(); return t; } catch (Exception e) { e.printStackTrace(); return null; } finally { lock.unlock(); } } }
起先输出:
push 0
push 1
push 2
push 3
push 4 至此后面5条线程wait
sleep
pop 0 释放一条并且notifyall
push 5
push 6
push 7
push 8
push 9 这5条生产端的全部被notify,而这个notify本意是通知生产端的,并继续生产至9,尴尬的是还没轮到pop线程
pop 1
pop 2
pop 3
pop 4
pop 5
pop 6
pop 7
pop 8
pop 9
改成while后
push 0
push 1
push 2
push 3
push 4
sleep
pop 0 交替
push 5
pop 1
push 6
pop 2
push 7
pop 3
push 8
pop 4
push 9
pop 5
pop 6
pop 7
pop 8
pop 9
jdk:功能比我多一点,我的push和pop相当于jdk的put和take
对于offer与poll的超时则将代码中的await()换=>
boolean await(long var1, TimeUnit var3)
抛出异常 | 特殊值 | 阻塞 | 超时 | |
插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 | remove() |
poll() |
take() |
poll(time, unit) |