在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口!
ConcurrentLinkedQueue:
是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。
该队列的元素遵循先进先出的原则,头是最先加入的,尾是最近加入的,该队列不允许null元素。
ConcurrentLinkedQueue重要方法:
add() 和 offer() 都是加入元素的方法 (在ConcurrentLinkedQueue中,这俩个方法没有任何区别)
poll() 和 peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。
BlockingQueue:
BlockingQueue接口的重要方法
offer(anObject): 表示如果可能的话, 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true, 否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit), 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
MyQueueTest
1 package com.bfxy.thread.cord.collection; 2 3 public class MyQueueTest { 4 5 public static void main(String[] args) throws InterruptedException { 6 MyQueue mq = new MyQueue(5); 7 mq.put("a"); 8 mq.put("b"); 9 mq.put("c"); 10 mq.put("d"); 11 mq.put("e"); 12 13 System.err.println("当前元素的个数"+mq.size()); 14 15 Thread t1 = new Thread(new Runnable() { 16 17 @Override 18 public void run() { 19 mq.put("f"); 20 mq.put("g"); 21 22 } 23 },"t1"); 24 25 Thread.sleep(1000); 26 27 Thread t2 = new Thread(new Runnable() { 28 29 @Override 30 public void run() { 31 try { 32 Thread.sleep(1000); 33 Object o1=mq.take(); 34 Thread.sleep(1000); 35 Object o2=mq.take(); 36 } catch (InterruptedException e) { 37 // TODO Auto-generated catch block 38 e.printStackTrace(); 39 } 40 41 } 42 },"t2"); 43 44 t1.start(); 45 Thread.sleep(1000); 46 t2.start(); 47 Thread.sleep(5000); 48 49 System.err.println(mq.getQueueList().toString()); 50 } 51 52 53 }
MyQueue
1 package com.bfxy.thread.cord.collection; 2 3 import java.util.LinkedList; 4 import java.util.List; 5 import java.util.concurrent.atomic.AtomicInteger; 6 7 public class MyQueue { 8 //1 就是我们整个队列的容器 9 private final LinkedList<Object> list =new LinkedList<>(); 10 11 //2计数器 12 private final AtomicInteger count = new AtomicInteger(); 13 14 private final int maxSize; //最大容量限制 15 16 17 private final int minSize = 0; 18 19 private final Object lock = new Object(); //锁 20 21 public MyQueue(int maxSize) { 22 this.maxSize = maxSize; 23 } 24 25 public void put(Object obj) { 26 synchronized (lock) { 27 while (count.get() ==this.maxSize) { 28 try { 29 lock.wait(); 30 } catch (InterruptedException e) { 31 // TODO Auto-generated catch block 32 e.printStackTrace(); 33 } 34 } 35 //添加新的元素进到容器里 36 list.add(obj); 37 count.getAndIncrement();//i++ 38 System.err.println("元素"+obj+"已经添加容器中"); 39 //进行唤醒可能正在等待的take方法操作中的线程 40 lock.notify(); 41 42 } 43 44 } 45 46 public Object take() { 47 Object temp= null; 48 synchronized (lock) { 49 while (count.get() ==this.minSize) { 50 try { 51 lock.wait(); 52 } catch (InterruptedException e) { 53 // TODO Auto-generated catch block 54 e.printStackTrace(); 55 } 56 } 57 temp=list.removeFirst(); 58 count.getAndDecrement();//i-- 59 System.err.println("元素"+temp+"已经从容器中取走"); 60 //进行唤醒可能正在等待的put方法操作中的线程 61 lock.notify(); 62 63 } 64 65 return temp; 66 } 67 68 public int size() { 69 return count.get(); 70 } 71 72 public List<Object> getQueueList() { 73 return list; 74 } 75 76 77 }