zoukankan      html  css  js  c++  java
  • jdk源码阅读-ConcurrentLinkedQueue(一)

    说明

    concurrentLinkedQueue为无界非阻塞队列,是线程安全的 内部结构为链表的形式, 内部使用cas保存线程安全。采用cas保证原子性

    什么是CAS

    CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作

    AtomicInteger例子

     AtomicInteger atomicInteger=new AtomicInteger();
    //内部通过CAS保证原子性
     atomicInteger.getAndIncrement();
      private static final Unsafe unsafe = Unsafe.getUnsafe();
      public final int getAndIncrement() {
         return unsafe.getAndAddInt(this, valueOffset, 1);
      }
       public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
                /**
                 * var1为对象  var2 为对象内存中的属性值  var5 为期望值  var4为修改后的值 如果cas失败则继续重试
                 * 从内存中取出var1对象的var2的值为内存值
                 */
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }

    注意:Unsafe是c++实现的 遗憾的是这个类是jdk专用 我们并不能使用

    ConcurrentLinkedQueue类图

     并没有实现BlockingQueue所以是并不是阻塞队列  同时也不适合使用线程池  并发情况下队列会无线扩张 会导致内存溢出 以及线程池的maximumPoolSize 参数无效

    详情看:https://www.cnblogs.com/LQBlog/p/8735356.html#autoid-1-0-0

    源码

    核心属性

        /**
         * 头节点
         */
        private transient volatile ConcurrentLinkedQueue.Node<E> head;
    
        /**
         * 尾节点
         */
        private transient volatile ConcurrentLinkedQueue.Node<E> tail;
    
        public ConcurrentLinkedQueue() {
            head = tail = new ConcurrentLinkedQueue.Node<E>(null);
        }

    从尾部入队 从头部出队  都用volatile修饰 表示多线程修改 其中一个线程修改 会立即刷新到主内存 对其他线程立即可见

    Node源码

        private static class Node<E> {
            //当前元素值
            volatile E item;
            //下一个节点元素
            volatile ConcurrentLinkedQueue.Node<E> next;
    
            /**
             * Constructs a new node.  Uses relaxed write because item can
             * only be seen after publication via casNext.
             */
            Node(E item) {
                UNSAFE.putObject(this, itemOffset, item);
            }
    
            boolean casItem(E cmp, E val) {
                return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
    
            void lazySetNext(ConcurrentLinkedQueue.Node<E> val) {
                UNSAFE.putOrderedObject(this, nextOffset, val);
            }
    
            boolean casNext(ConcurrentLinkedQueue.Node<E> cmp, ConcurrentLinkedQueue.Node<E> val) {
                return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
    
            // Unsafe mechanics
    
            private static final sun.misc.Unsafe UNSAFE;
            //偏移量 cas
            private static final long itemOffset;
            //便宜量
            private static final long nextOffset;
    
            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = ConcurrentLinkedQueue.Node.class;
                    itemOffset = UNSAFE.objectFieldOffset
                            (k.getDeclaredField("item"));
                    nextOffset = UNSAFE.objectFieldOffset
                            (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

    item 和next都是volatile修饰

    几个原则

    1.next和head属性都不会为null 就算队列中一个元素也没有也会同时指向同一个空节点

    2.head和tail刷新并不是实时的

    3.head并不一定是真正的头节点 tail并不一定是尾节点

    4.tail.next=null一定是尾节点

    offer源码

        public boolean add(E e) {
            return offer(e);
        }
        public boolean offer(E e) {
            //1.检查是否为空 空元素不允许入队
            checkNotNull(e);
            //2.创建node节点包装类
            final ConcurrentLinkedQueue.Node<E> newNode = new ConcurrentLinkedQueue.Node<E>(e);
            //3映射cas修改失败重新查找尾节点的遍历重试  以及多线程并发修改 和tail不是真正未接节点的 真正尾节点查找
            for (ConcurrentLinkedQueue.Node<E> t = tail, p = t; ; ) {
                ConcurrentLinkedQueue.Node<E> q = p.next;
                /**
                 * 3.q等于null代表为最后一个节点
                 *    映射代码5的延迟修改tail
                 *    映射代码6的cas修改并不是一定成功也没采取重试
                 *    映射多线程有其他线程抢先入队
                 */
                if (q == null) {
                    /**
                     *  4.cas更新 底层使用Unsafe c++实现 直接调度cpu指令实现 cas更新 并发情况下只会有一个线程成功(可以理解为乐观锁)
                     */
                    if (p.casNext(null, newNode)) {
                        /**
                         * 5.p!=t 代表应该更新tail节点为newNode
                         *  当第一次for循环q!=null 循环找到真正的尾节点的时候cas修改成功的时候才成立
                         *  单线程情况下每次都是间隔一次入队才来更新tail 减少cas修改tail
                         */
                        if (p != t)
                            casTail(t, newNode); //6.cas设置tail为当前新节点 并没有采取重试 有可能失败 失败也没关系 代码3会遍历找到真正尾节点
                        return true;
                    }
                } else if (p == q) {
                    /**
                     * 7.tail自引用时发生 item就是他自身 重新根据头节点找尾节节点
                     *   自引用即 tail item=next 这个时候head才是真正尾节点
                     *   (t != (t = tail))判断tail是否改变 改变了则取最新tail否则从head开始遍历往下找 找真正节点
                     *   这个判断是防止多线程情况下 有其他线程抢先入队 也可以理解 其他线程已经抢先走了这个分支找到真正尾节点入队并修改了tail
                     *   自引用:并发情况下 又在入队又在出队的情况下会发生  我测试 添加第一个元素就是自引用 后续元素则不是
                     */
                    p = (t != (t = tail)) ? t : head;
                else
                    /**
                     * 8.当tail并不是最后一个节点时
                     *   对应代码5的延迟修改 以及cas修改失败 或者多线程抢先入队 导致当前线程tail并不是真正尾节点
                     *   (p != t && t != (t = tail)为判断tail是否有其他线程抢先修改 如果有则取最新tail 遍历开始找真正尾节点 否则继续向下寻找
                     * 
                     */
                    p = (p != t && t != (t = tail)) ? t : q;
                }
            }
    
        }

    代码1:

             检查代码元素是否为空 如果为空则抛出异常 

    代码2:

            创建一个node对象包装值

    代码3:

           判断q==null(p=tail,q=p.next) 即判断是否为最后一个节点 这里判断主要出于以下几点考虑

          1.代码5的判断 表示每次都是间隔一次才更新taill 所以tail并不是实时修改的

          2.代码6的cas没有重试 也表示修改taill并不是一定成功的

    代码4:

          如果taill是最后一个节点 则cas更新 如果cas更新是吧(表示其他线程抢先更新了 重新执行for循环)

    代码5:

           看到这里可能会有疑问(p=t=tail) 为什么要判断p!=t   因为第一次不更新尾节点  所以第二次offer代码3并不会成立走下面else分支重新寻找真正的尾节点 即p不会等于t

    代码6:

          这里cas可能会修改失败 但是并没有关系 注意外面for循环 和判断 当不是尾节点 会重新寻找真正的尾节点

    代码7:

         p==q(p.next=p)网上资料说poll可能会导致自引用 我jdk8初始化队列 添加第一个元素就回显了自引用(没找到原因)  自引用的时候head才是真正的尾节点

         p = (t != (t = tail)) ? t : head;   这代码 第一个判断是 避免多线程情况下 其他线程入队已经修改了tail的的值 所以这个时候就不是自引用 则取taill遍历找尾节点  否则 从head重新找起

    代码8:

         不直接取taill.next为p加入遍历 而是加入了taill是否改变的判断 也是为了防止其他线程修改了tail 如果修改了则取最新tail遍历 否则取p.next遍历查找尾节点

    可以看到整个入队都是采用无锁设计 使用cas+循环 保证入队一定会成功,但是增加了for循环遍历次数

    poll源码

      public E poll() {
            restartFromHead:
            /**
             * 代码1无限循环
             */
            for (;;) {
                for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
                    /**
                     * 代码2从head节点开始取
                     */
                    E item = p.item;
                    /**
                     * 代码3 如果item不为空 表示未出队 同时通过cas将item置为null
                     * 如果cas失败则表示其他线程抢先出队 则重新获取
                     */
                    if (item != null && p.casItem(item, null)) {
                        /**
                         *代码4 每次出队并不会马上修改head节点 而是下一次出队时再修改
                         */
                        if (p != h) // hop two nodes at a time
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    /**
                     * 代码5 表示队列为空返回null
                     * 如果p.next!=null则继续执行下面逻辑 注意此时q已经被赋值为p.next
                     */
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    /**
                     * 代码6
                     */
                    else if (p == q)
                        continue restartFromHead;
                    /**
                     * 代码7
                     * 将next节点赋值给p  遍历重新找到head节点
                     * 对应代码3cas失败
                     * 对应代码4不是实时修改head节点
                     * 对应代码5的判断给q复制next节点
                     */
                    else
                        p = q;
                }
            }
        }

     size方法

      public int size() {
            // 计数
            int count = 0;
            for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个存活的结点开始往后遍历
                if (p.item != null) // 结点的item域不为null
                    // Collection.size() spec says to max out
                    if (++count == Integer.MAX_VALUE) // 增加计数,若达到最大值,则跳出循环
                        break;
            // 返回大小
            return count;
        }

          

  • 相关阅读:
    微软面试题: LeetCode 907. 子数组的最小值之和 middle 出现次数:1
    微软面试题: LeetCode 5. 最长回文子串 出现次数:1
    微软面试题: LeetCode 120. 三角形最小路径和 出现次数:1
    开源项目推荐:主流RPC开源框架及知识科普
    微软面试题: LeetCode 84. 柱状图中最大的矩形 出现次数:1
    需重点掌握的三大排序: 快速排序 + 归并排序 + 堆排序
    微软面试题:补充题12. 二叉树的下一个节点 出现次数:2
    微软面试题: 剑指 Offer 51. 数组中的逆序对 出现次数:2
    GUI
    数据结构与算法
  • 原文地址:https://www.cnblogs.com/LQBlog/p/11607351.html
Copyright © 2011-2022 走看看