阻塞队列介绍
1,ArrayBlockingQueue:数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序。
2,LinkedBlockingQueue:链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
3,PriorityBlockingQueue:支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素 排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。
4,DelayQueue:优先级队列实现的无界阻塞队列。
5,SynchronousQueue:不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
6,LinkedTransferQueue:链表实现的无界阻塞队列。
7,LinkedBlockingDeque:链表实现的双向阻塞队列。
插入数据的方法
boolean add(E e); //如果队列满了继续插入会 抛出异常 boolean offer(E e); // 返回true或false表示插入成功或者失败 void put(E e); //阻塞插入元素
获取数据的方法
boolean remove(Object o); //返回true或false表示获取成功或者失败,队列为空返回false E poll(); //当队列中存在元素时,返回该元素,如果队列为空返回null E take(); //阻塞获取元素
ArrayBlockingQueue原理分析
1,构造方法
int takeIndex; int putIndex; int count; // capacity 数组的长度 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
2,add方法
public boolean add(E e) { //调动了offer 返回true和false if (offer(e)) return true; else throw new IllegalStateException("Queue full"); //如果插入失败 抛出异常 } public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) //如果队列满了 返回false return false; else { enqueue(e); //将元素添加到队列中 return true; } } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //通过 putIndex 索引直接将元素添加到数组 items if (++putIndex == items.length) putIndex = 0; // 如果下次存放的索引等于队列的长度,那么指向0,这就是循环队列 count++; notEmpty.signal(); //唤醒处于等待的线程 例如take阻塞的线程 }
3,put方法,和add方法一样,不过当队列满了以后,会阻塞。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //获得锁,和lock的区别是这个方法优先允许在等待时由其他线程调用等待线程的 interrupt 方法来中断等待直接返回。而 lock方法是尝试获得锁成功后才响应中断 lock.lockInterruptibly(); try { while (count == items.length) //队列满了之后,当前线程被notFull条件对象挂起 notFull.await(); enqueue(e); } finally { lock.unlock(); } }
4,阻塞获取队列中元素的方法。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //队列中没有元素,阻塞 return dequeue(); } finally { lock.unlock(); } } //删除队列头部的元素 返回给客户端 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //默认获取位置为0的元素 items[takeIndex] = null; //将该位置元素置为空 if (++takeIndex == items.length) takeIndex = 0; //如果takeIndex达到最大值,则重新从0获取 count--; //元素个数递减 if (itrs != null) itrs.elementDequeued(); //同时更新迭代器中的元素数据 notFull.signal(); //触犯队列满了以后的阻塞 return x; }
5,remove方法,删除指定的一个元素
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; //获取下一个要添加元素的索引 int i = takeIndex; //获取当前要被移出的元素的索引 do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
原子操作类
JDK1.5之后,juc提供了Atomic包,提供了对于常用数据结构的原子操作。它提供了一个简单、高效以及线程安全的对变量更新的操作方式。按照变量类型的划分可以分为以下几类。
1,原子更新基本类型:AtomicBoolean、AtomicInteger、AtomicLong
2,原子更新数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
3,原子更新引用:AtomicReference、 AtomicReferenceFieldUpdater、AtomicMarkableReference(更新带有标记位的引用类型)
4,原子更新字段:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicStampedReference
AtomicInteger分析
unsafe:unsafe类相当于一个后门,使得java可以像c语言的指针一样直接操作内存空间,实际上这个类在很多方面都有使用,例如Netty、Kafka等等。
这个类提供了很多功能:包括多线程同步(MonitorEnter)、CAS操作(compareAndSwap)、线程的挂起和恢复(park和unpark)、内存屏障(loadFence和storeFence)、内存管理(内存分配,获取内存地址和释放内存等)。
1,getAndIncrement()
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
2,getAndAdd(int )
public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
3,get()
//value用volatile修饰 public final int get() { return value; }