zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(13)----ConcurrentLinkedQueue(非阻塞队列)和BlockingQueue(阻塞队列)原理

    ·  在并发编程中,我们有时候会需要使用到线程安全的队列,而在Java中如果我们需要实现队列可以有两种方式,一种是阻塞式队列。另一种是非阻塞式的队列,阻塞式队列采用锁来实现,而非阻塞式队列则是采用cas算法来保证线程安全的,接下来就让我们来看一下jdk中两种队列的实现方式。

    1. ConcurrentLinkedQueue的实现原理

      顾名思义,这是一个基于链表结构的队列,它是一个先进先出的队列,当我们添加元素时,添加的元素链接到队列的尾部,当获取元素时返回队列的头部元素。

      先看添加队列时ConcurrentLinkedQueue的实现方法offer():

     public boolean offer(E e) {
            //判断元素是否为空,为空则抛出异常
            checkNotNull(e);
           //创建一个新的节点 Node中的结构为item(数据) next(下一个节点)
            final Node<E> newNode = new Node<E>(e);
           //自旋,将节点添加到队列尾部
            for (Node<E> t = tail, p = t;;) {
                //获取到p的下一个节点,即是当前tail节点的的下一个节点
                Node<E> q = p.next;
                //如果q为空,表示q的下一个节点为null,直接将当前节点添加到队列尾部
                if (q == null) {
                    // p is last node
                   //添加节点到队列尾部
                    if (p.casNext(null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this queue,
                        // and for newNode to become "live".
                        //第一次进来时p == t,所以这里不会将当前节点设置成tail
                        if (p != t) // hop two nodes at a time
                            casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
                else if (p == q)
                //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
                //重新找新的head,因为新的head后面的节点才是激活的节点
                    p = (t != (t = tail)) ? t : head;
                else
                    // Check for tail updates after two hops.
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        } 

      首先检查当前进来的元素是否为null,为null则抛出空指针异常。将tail赋值给t和p,无限循环,将当前p.next()赋值给q,如果为空,则将当前节点添加到队列尾部,添加成功则继续看p 是否等于t,第一次进来时是相等的,所以不会调用casTail()方法,直接返回true即可,因为在多线程的环境下,可能会出现线程进入循环时,q不等于空,此时看p == q是否成立,开始必然是不成立的,执行最后一个else,将tal赋值给t,并将q赋值给p,继续循环,此时p的next为空,则执行将节点添加到队列尾部的操作,并且此时的p 不等于t,更新尾节点tail位置为当前新添加的节点。

      出队方法poll():

     public E poll() {
            restartFromHead:
            for (;;) {
                for (Node<E> h = head, p = h, q;;) {
                    E item = p.item;
    
                    if (item != null && p.casItem(item, null)) {
                        // Successful CAS is the linearization point
                        // for item to be removed from this queue.
                        if (p != h) // hop two nodes at a time
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }

      先来个死循环,再来一个死循环将head赋值给h,h 赋值给p, 得到头节点的值保存到item中,当item不为空时,通过cas算法将p节点的item设置为null,设置成功后,判断p是否等于h,等于则更新头节点,并将p.next()赋值给q,当p.next不为空时,头节点设置为q.否则设置为p,如果p.next()为null,则将更新头节点,返回null,如果p==q,自引用了,则重新找新的头节点。

      ConcurrentLinkedQueue主要就是利用了cas算法来保证了多线程环境下线程的安全,这样的算法其实性能来说是比较优的,速度相比较阻塞式的算法会更好一些。

    2. BlockingQueue

      BlockingQueue是一个阻塞式队列,当tack()时队列为空,则它不会返回空或者是抛异常,线程此时会一直等待着,知道队列中存在数据时才会去取出数据,同时put()当队列满了的情况下,也会等待,知道其他线程取出队列中的数据,腾出空间之后再执行入队操作,其实它也提供了add/remove这样的非阻塞式方法的,当队列full或队列为空时,直接抛出异常,这里我们主要说的是它阻塞的情况,主要有put/take()方法。

      这里以BlockingQueue的实现类ArrayBlockingQueue源码进行分析。

      put():

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                insert(e);
            } finally {
                lock.unlock();
            }
        }

      其实这里的实现原理就是生产者与消费者使用Condition实现原理一样,notFull和notEmpty两个Condition,使用可中断锁,count作为元素个数标记,以一个数组来保存元素,当count等于数组长度时,使notFull等待,否则调用insert()方法添加元素。

      insert()方法:

     private void insert(E x) {
            items[putIndex] = x;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        }

      这里很简单,将元素保存到数组中,count++,唤醒等待读取元素的线程,告诉它已经有数据了,可以获取了。

      take

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return extract();
            } finally {
                lock.unlock();
            }
        }

      使用中断锁,当count为0时,表示当前队列中已经没有资源了,所以线程等待,否则就调用extract()返回数据。

      extract():

    private E extract() {
            final Object[] items = this.items;
            E x = this.<E>cast(items[takeIndex]);
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }

      得到takeIndex位置的元素,保存到x中,并且将下表位置的元素置为空,更改takeIndex,count--,唤醒可能由于队列满了的情况下被等待的添加元素的线程,返回x,这样就取到了当前的元素。

      这里的实现原理跟生产者/消费者模式一样,同时使用Condition来指定唤醒某些等待的线程,实现了多线程下队列的阻塞和线程安全。

  • 相关阅读:
    js上传图片预览
    Android 调用QQ登录
    未开启HugePages ORACLE session剧增时引起的一次悲剧
    脱了裤子放屁之std::string
    [Python爬虫] Selenium自己主动訪问Firefox和Chrome并实现搜索截图
    tomcat启动报错,找不到相应的 queue,从而引发内存泄漏
    LeetCode: Binary Tree Postorder Traversal [145]
    素数打表法。
    linux 抓包 tcpdump 简单应用
    Linux命令之kill
  • 原文地址:https://www.cnblogs.com/Eternally-dream/p/9761212.html
Copyright © 2011-2022 走看看