zoukankan      html  css  js  c++  java
  • Queue

    概述

    image

    1. 有界无界:
      • 有界队列:队列大小是有限制的
        • ArrayBlockingQueue:数组实现,满了就通过锁阻塞等待
        • LinkedBlockingQueue:虽然是有界队列,但是容量是个非常大的值,根本不会满。 满了也是通过锁阻塞
      • 无界队列:
        • PriorityQueue:数组实现无界队列,满了就扩容
        • ConcurrentLinkedQueue:链表实现的无界队列,没有容量限制
        • DelayQueue:底层通过PriorityQueue实现
        • SynchronousQueue:内部通过node维护的链表或者栈,是无界的
        • PriorityBlockingQueue:底层通过PriorityQueue实现
    2. 线程安全:
      • 线程安全:
        • ArrayBlockingQueue:添加删除同一把锁ReentrantLock
        • LinkedBlockingQueue:添加删除两把锁
        • ConcurrentLinkedQueue:通过cas实现,因为tail节点更新机制,所以是弱一致性
        • DelayQueue:通过ReentrantLock将更新操作锁住
        • SynchronousQueue:内部通过cas操作
        • PriorityBlockingQueue:添加删除同一把锁ReentrantLock
      • 线程不安全:
        • PriorityQueue:modCount
    
    public interface Queue<E> extends Collection<E> {
        // 增加一个元索 ,如果队列已满,则抛出一个IIIegaISlabEepeplian异常
        boolean add(E e);
        //添加一个元素并返回true,如果队列已满,则返回false
        boolean offer(E e);
        //移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
        E remove();
        //移除并返问队列头部的元素 ,如果队列为空,则返回null
        E poll();
        //返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
        E element();
        //返回队列头部的元素 ,如果队列为空,则返回null
        E peek();
    }
    
    1. 对于抛异常的方法,在实现类里边的实现是阻塞

    PriorityQueue

    使用示例

    class Person implements Comparable{
        private String name;
        private int age;
    
        Person(String name,int age){
            this.name = name;
            this.age = age;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getAge() {
            return age;
        }
    
        public void setAge(int age) {
            this.age = age;
        }
    
        @Override
        public int compareTo(Object o) {
            Person person = (Person)o;
            return this.age - person.getAge();//按照名字排序
        }
    }
    
     public static void main(String[] args) {
            Queue<Person> priorityQueue = new PriorityQueue();
            Person p1 = new Person("a",1);
            Person p2 = new Person("b",2);
            Person p3 = new Person("c",3);
            Person p4 = new Person("d",4);
            priorityQueue.add(p2);
            priorityQueue.add(p1);
            priorityQueue.add(p4);
            priorityQueue.add(p3);
            Person person = priorityQueue.poll();
            while (person!=null){
                System.out.println(person.getName());
                person = priorityQueue.poll();
            }
    
    
        }
    

    底层排序是通过实现的。

    属性有哪些

     public class PriorityQueue<E> extends AbstractQueue<E>
                    implements java.io.Serializable {
    
                private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
                transient Object[] queue; // non-private to simplify nested class access
                private int size = 0;
                private final Comparator<? super E> comparator;
                transient int modCount = 0; // non-private to simplify nested class access
    
            }
    
    1. object数组queue
    2. 默认DEFAULT_INITIAL_CAPACITY:11
    3. 大小size
    4. 比较器comparator
    5. fail-fast计数器modCount:非线程安全

    构造器

        public PriorityQueue() {
            this(11, (Comparator)null);
        }
    
        public PriorityQueue(int var1) {
            this(var1, (Comparator)null);
        }
    
        public PriorityQueue(Comparator<? super E> var1) {
            this(11, var1);
        }
    
        public PriorityQueue(int var1, Comparator<? super E> var2) {
            this.size = 0;
            this.modCount = 0;
            if (var1 < 1) {
                throw new IllegalArgumentException();
            } else {
                this.queue = new Object[var1];
                this.comparator = var2;
            }
        }
    
        public PriorityQueue(Collection<? extends E> var1) {
            this.size = 0;
            this.modCount = 0;
            if (var1 instanceof SortedSet) {
                SortedSet var2 = (SortedSet)var1;
                this.comparator = var2.comparator();
                this.initElementsFromCollection(var2);
            } else if (var1 instanceof PriorityQueue) {
                PriorityQueue var3 = (PriorityQueue)var1;
                this.comparator = var3.comparator();
                this.initFromPriorityQueue(var3);
            } else {
                this.comparator = null;
                this.initFromCollection(var1);
            }
    
        }
    
        public PriorityQueue(PriorityQueue<? extends E> var1) {
            this.size = 0;
            this.modCount = 0;
            this.comparator = var1.comparator();
            this.initFromPriorityQueue(var1);
        }
    
        public PriorityQueue(SortedSet<? extends E> var1) {
            this.size = 0;
            this.modCount = 0;
            this.comparator = var1.comparator();
            this.initElementsFromCollection(var1);
        }
    
    1. 有7个构造器:
      • 无参构造器:
      • 指定容量:
      • 指定比较器:
      • 指定容量和比较器:
      • 按照入参collection进行初始化:判断collection是哪种类型的
        • PriorityQueue:直接强转成PriorityQueue
        • SortedSet:通过Arrays.copyOf转换成Object数组
        • 两种都不属于:因为这种情况初始进来的元素是无序的,初始化最后一步要进行一次排序
      • 按照入参PriorityQueue进行初始化:
      • 按照入参SortedSet进行初始化:

    添加方法

    public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            modCount++;
            int i = size;
            if (i >= queue.length)
                grow(i + 1);//扩容:类似ArrayList,通过Arrays.copyOf方法
            size = i + 1;
            if (i == 0)
                queue[0] = e;
            else
                siftUp(i, e);//调整堆,过程参考堆排序链接
            return true;
        }
    
    1. add和offer调用的是同一个方法
    2. 逻辑同ArrayList一样,最后进行一下排序即可

    返回顶部

    ArrayBlockingQueue

    属性:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
                implements BlockingQueue<E>, java.io.Serializable {
    
            private static final long serialVersionUID = -817911632652898426L;
    
            final Object[] items;
            /** take, poll, peek or remove的下一个索引 */
            int takeIndex;
            /** put, offer, or add的下一个索引 */
            int putIndex;
            /**队列中元素个数*/
            int count;
    
            final ReentrantLock lock;
    
            private final Condition notEmpty;
    
            private final Condition notFull;
    
            transient java.util.concurrent.ArrayBlockingQueue.Itrs itrs = null;
        }
    

    原理:

    以下是自己实现的一个简单版本的ArrayBlockQueue帮助理解:

    
    public class ArrayBlockQueue {
        private Object[] items;
        private int size;
        private int sizeMax;
        private Lock lock = new ReentrantLock();
        private Condition inLock = lock.newCondition();
        private Condition outLock = lock.newCondition();
    
        public ArrayBlockQueue(int sizeMax){
            this.sizeMax = sizeMax;
            items = new Object[sizeMax];
        }
    
        public void add(Object o){
            lock.lock();
            try{
                if(size == sizeMax){
                    inLock.await();
                    System.out.println("我满了");
                }
                size++;
                items[size-1] = o;
                System.out.println("成功添加元素:"+o.toString());
                if(sizeMax>0){
                    System.out.println("队列中有数据了,获取线程可以来拿数据了");
                    outLock.signal();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    
        public Object get(){
            lock.lock();
            Object result = null;
            try{
                if(size==0){
                    outLock.await();
                    System.out.println("我空了");
                }
                size--;
                result = items[size];
                items[size] = null;
                System.out.println("成功获取一个元素:"+ result.toString());
                if(size<sizeMax){
                    System.out.println("队列不满了,添加线程可以加元素了");
                    inLock.signal();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
            return result;
        }
    
        public static void main(String[] args) {
            /***
             * 创建一个队列
             * 开启三个线程,往队列中添加元素,一旦队列满了,则添加线程阻塞
             * 开启一个线程,从队列中拿取元素,一旦队列空了,则拿取线程阻塞
             * 添加元素的时候,发现队列中有元素了,通知获取线程可以来拿元素了
             * 拿取元素的时候,发现队列元素不满了,通知添加线程可以来添加元素了
             */
            ArrayBlockQueue arrayBlockQueue = new ArrayBlockQueue(2);
            Thread t1 =  new Thread(new Runnable() {
                @Override
                public void run() {
                    arrayBlockQueue.add("t1");
                }
            });
            Thread t2 =  new Thread(new Runnable() {
                @Override
                public void run() {
                    arrayBlockQueue.add("t2");
                }
            });
            Thread t3 =  new Thread(new Runnable() {
                @Override
                public void run() {
                    arrayBlockQueue.add("t3");
                }
            });
            Thread t4 =  new Thread(new Runnable() {
                @Override
                public void run() {
                    arrayBlockQueue.get();
                }
            });
            t1.start();//添加
            t2.start();//添加
            t3.start();//添加
            t4.start();//获取
        }
    }
    
    

    返回顶部

    LinkedBlockingQueue

        public static void main(String[] args) {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            linkedBlockingQueue.offer(1);
            linkedBlockingQueue.add(1);
            linkedBlockingQueue.poll();
            linkedBlockingQueue.remove();
            linkedBlockingQueue.peek();
            linkedBlockingQueue.element();
        }
    
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        static class Node<E> {
            E item;
    
            java.util.concurrent.LinkedBlockingQueue.Node<E> next;
    
            Node(E x) {
                item = x;
            }
        }
    
        //容量
        private final int capacity;
    
        //已存入的元素数量
        private final AtomicInteger count = new AtomicInteger();
    
        transient java.util.concurrent.LinkedBlockingQueue.Node<E> head;
    
        private transient java.util.concurrent.LinkedBlockingQueue.Node<E> last;
    
        //take和put是两把锁 -- 所以性能上比ArrayBlockingQueue要好一些
        private final ReentrantLock takeLock = new ReentrantLock();
        private final Condition notEmpty = takeLock.newCondition();
        private final ReentrantLock putLock = new ReentrantLock();
        private final Condition notFull = putLock.newCondition();
    
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    }
    

    构造器:

        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            //创建一个内容为null的node,将链表头和尾都指向该node
            last = head = new LinkedBlockingQueue.Node<E>(null);
        }
    

    offer:

    public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
            //加锁
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    //向链表尾部插入node
                    enqueue(node);
                    //size++
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        //如果链表没满,通知阻塞的添加线程
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)//初始值是-1,如果等于0代表上面添加元素成功,链表不为空了,通知阻塞的take线程
                signalNotEmpty();
            return c >= 0;
        }
        private void enqueue(LinkedBlockingQueue.Node<E> node) {
            last = last.next = node;
        }
    

    返回顶部

    ConcurrentLinkedQueue

    看下属性

        public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, Serializable {
            private transient volatile java.util.concurrent.ConcurrentLinkedQueue.Node<E> head;
            private transient volatile java.util.concurrent.ConcurrentLinkedQueue.Node<E> tail;
            private static final Unsafe UNSAFE;
    
            private static class Node<E> {
                volatile E item;
                volatile java.util.concurrent.ConcurrentLinkedQueue.Node<E> next;
            }
        }
    
    1. 链表自然包含头节点和尾节点,内部类node,单向链表持有下个节点的引用
    2. Unsafe表明更新是通过cas更新,也表明是线程安全的队列

    添加方法

           public boolean offer(E e) {
                // 如果e为null,则直接抛出NullPointerException异常
                checkNotNull(e);
                // 创建入队节点
                final Node<E> newNode = new Node<E>(e);
    
                // 循环CAS直到入队成功
                // 1、根据tail节点定位出尾节点(last node);2、将新节点置为尾节点的下一个节点;3、casTail更新尾节点
                for (Node<E> t = tail, p = t;;) {
                    // p用来表示队列的尾节点,初始情况下等于tail节点
                    // q是p的next节点
                    Node<E> q = p.next;
                    // 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
                    // 如果p是尾节点
                    if (q == null) {
                        // p is last node
                        // 设置p节点的下一个节点为新节点,设置成功则casNext返回true;否则返回false,说明有其他线程更新过尾节点
                        if (p.casNext(null, newNode)) {
                            // Successful CAS is the linearization point
                            // for e to become an element of this queue,
                            // and for newNode to become "live".
                            // 如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                            if (p != t) // hop two nodes at a time
                                casTail(t, newNode);  // Failure is OK.
                            return true;
                        }
                        // Lost CAS race to another thread; re-read next
                    }
                    // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
                    // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
                    else if (p == q)
                        // We have fallen off list.  If tail is unchanged, it
                        // will also be off-list, in which case we need to
                        // jump to head, from which all live nodes are always
                        // reachable.  Else the new tail is a better bet.
                        p = (t != (t = tail)) ? t : head;
                        // 寻找尾节点
                    else
                        // Check for tail updates after two hops.
                        p = (p != t && t != (t = tail)) ? t : q;
                }
            }
        }
    

    先图解一下添加过程

    添加过程中有两个更新操作,一个是将新元素插入到队列尾部,第二就是更新tail节点的指向。
    但是这里更新tail节点有个判断:只有当tail的next不是null的时候才更新,否则这一步略过

    1. 初始化的时候,创建一个node,该node即是tail又是head, 它的next指向null
    2. 添加元素1,将head的next指向元素1,因为head和tail是同一个,所以tail的next是元素1,不是null,所以不更新tail
    3. 添加元素2,根据tail节点找到其next为元素1,判断元素1是当前最后一个节点,将元素1next指向元素2,因为tail的next节点不是null,所以更新tail指向元素2

    我们发现并不是每一步都更新tail节点,为什么这么设计?
    因为在多线程环境下这里的更新采用cas操作,每次更新可能失败,失败就需要重试,多重试一次就影响性能,所以通过这种方式减少了更新操作(一次添加需要两个更新,但是这里有时候把更新tail省略了),所以通过这种方式来提升入队性能。

    返回顶部

    DelayQueue

    1. 加入其中的元素必需实现Delayed接口。当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
    2. 消费者线程查看队列头部的元素,注意是查看不是取出。然后调用元素的getDelay方法,如果此方法返回的值小0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果getDelay方法返回的值大于0,则消费者线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。

    属性有哪些

            public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
                private final transient ReentrantLock lock = new ReentrantLock();
                //底层数据结构是PriorityQueue
                private final PriorityQueue<E> q = new PriorityQueue();
                //等待获取队列元素的线程。
                private Thread leader = null;
                private final Condition available;
            }
    
    1. 底层是通过PriorityQueue来存储数据的,所以DelayQueue也是个无界队列
    2. leader:消费者线程来队列取数据的时候,如果待出队元素还未到过期时间,通过awaitNanos()阻塞这个线程,并且这里给存储下来。
    3. lock:所有更新操作要锁住

    添加方法:

            public boolean offer(E e) {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    q.offer(e);  //放一个元素
                    if (q.peek() == e) {
                        leader = null;    //  leader 线程为空
                        available.signal();   //其他线程唤醒,如果队首元素是刚插入的元素,则设置leader为null,并唤醒阻塞在available上的线程
                    }
                    return true;
                } finally {
                    lock.unlock();
                }
            }
    
    1. 将元素插入到内部的PriorityQueue,PriorityQueue会自动进行排序
    2. 如果插入的元素就是下一个即将到期的元素,将leader设置为null,并且唤醒原leader线程。
      • 这个是什么场景:
        • 消费者线程T1来获取元素,当前没有已经到期的元素,则将T1阻塞。
        • 插入的元素Node1是即将到期元素(或者已经到期),唤醒T1
        • T1判断Node1是否到期,如果到期则进行处理,如果没到期再次阻塞

    看下阻塞的获取方法

            public E take() throws InterruptedException {
                ReentrantLock var1 = this.lock;
                var1.lockInterruptibly();
    
                try {
                    while(true) {
                        while(true) {
                            Delayed var2 = (Delayed)this.q.peek();
                            if (var2 != null) {
                                long var3 = var2.getDelay(TimeUnit.NANOSECONDS);
                                if (var3 <= 0L) {
                                    Delayed var14 = (Delayed)this.q.poll();
                                    return var14;
                                }
    
                                var2 = null;
                                if (this.leader != null) {
                                    this.available.await();
                                } else {
                                    Thread var5 = Thread.currentThread();
                                    this.leader = var5;
    
                                    try {
                                        this.available.awaitNanos(var3);
                                    } finally {
                                        if (this.leader == var5) {
                                            this.leader = null;
                                        }
    
                                    }
                                }
                            } else {
                                this.available.await();
                            }
                        }
                    }
                } finally {
                    //leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列
                    if (this.leader == null && this.q.peek() != null) {
                        this.available.signal();
                    }
    
                    var1.unlock();
                }
            }
    
    1. 消费者线程来获取元素的时候,首先开启个while循环
    2. peek一下元素,如果没有peek到,证明队列是空的,直接阻塞线程
    3. 如果peek到了元素,继续判断该元素是否到期,如果已经到期,直接返回该元素
    4. 如果元素没有到期,判断leader是否为null,如果为null,更新当前线程为leader,并且通过awaitNanos阻塞(leader线程的作用就是这个,会根据元素到期时间自动唤醒)
    5. 如果leader不为null,直接阻塞线程即可(当有多个线程的时候,都各自阻塞在这里,当队列有元素后会唤醒所有阻塞在这的线程,这些线程在继续第一步开启的while循环,pk获取元素)
    6. 线程执行完毕后在finally里要通知一下其他阻塞队列

    返回顶部

    SynchronousQueue

    也是一个队列来,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),
    如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,
    take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。

    看一下属性

            public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
                private transient volatile java.util.concurrent.SynchronousQueue.Transferer<E> transferer;
                private ReentrantLock qlock;
                private java.util.concurrent.SynchronousQueue.WaitQueue waitingProducers;
                private java.util.concurrent.SynchronousQueue.WaitQueue waitingConsumers;
    
            }
    
    1. 我们只关注一个属性Transferer:这个是内部类,核心逻辑都是它实现的。它是一个抽象类,有两个实现类:
      • TransferQueue:内部通过链表实现的公平队列(先进先出)
      • TransferStack:内部通过栈实现的非公平队列(先进后出)
    2. 所有的添加、获取方法内部都是调用的Transferer的transferer方法。

    其实内部是由Node节点维护一个队列用来存储数据,每个node代表一个线程。

    先看下TransferQueue实现的公平模式:

    1. 初始状态下
    2. 线程put1执行put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下
    3. 接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下
    4. 这时候,来了一个线程take1,执行了 take操作,由于tail指向put2线程,put2线程跟take1线程配对了,这时take1线程不需要入队。
      但是请注意了,这时候,要唤醒的线程并不是put2,而是put1。为何?
      大家应该知道我们现在讲的是公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,我们的例子明显是put1应该优先被唤醒。
      公平策略总结下来就是:队尾匹配队头出队。
      执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:

    TransferStack非公平模式:

    1. 线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等待,这时栈状态如下:
    2. 接着,线程put2再次执行了put(2)操作,跟前面一样,put2线程入栈,自旋一小会后睡眠等待,这时栈状态如下:
    3. 这时候,来了一个线程take1,执行了take操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程
    4. 最后,再来一个线程take2,执行take操作,这跟步骤3的逻辑基本是一致的,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈变为空,恢复初始状态,如下图所示:

    返回顶部

    PriorityBlockingQueue

    锁的原理和ArrayBlockQueue一样的都是一把锁。
    元素操作同PriorityQueue一样。

    返回顶部

  • 相关阅读:
    测试开发技术
    测试开发技术难题与解决
    .gitignore文件
    mysql 子查询 联结 组合查询
    DTL
    jquery 基础
    Django操作数据库
    git 进阶篇
    miniconda使用
    pycharm之django基本配置
  • 原文地址:https://www.cnblogs.com/yanhui007/p/12579573.html
Copyright © 2011-2022 走看看