zoukankan      html  css  js  c++  java
  • 并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究

    并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究

    http://www.importnew.com/25668.html


    一、 前言

    常用的并发队列有阻塞队列和非阻塞队列,前者使用锁实现,后者则使用CAS非阻塞算法实现,使用非阻塞队列一般性能比较好,下面就看看常用的非阻塞ConcurrentLinkedQueue是如何使用CAS实现的。

    二、 ConcurrentLinkedQueue类图结构

    如图ConcurrentLinkedQueue中有两个volatile类型的Node节点分别用来存在列表的首尾节点,其中head节点存放链表第一个item为null的节点,tail则并不是总指向最后一个节点。Node节点内部则维护一个变量item用来存放节点的值,next用来存放下一个节点,从而链接为一个单向无界列表。

    1
    2
    3
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

    如上代码初始化时候会构建一个item为NULL的空节点作为链表的首尾节点。

    三、offer操作

    offer操作是在链表末尾添加一个元素,下面看看实现原理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public boolean offer(E e) {
        //e为null则抛出空指针异常
        checkNotNull(e);
     
       //构造Node节点构造函数内部调用unsafe.putObject,后面统一讲
        final Node<E> newNode = new Node<E>(e);
     
     
        //从尾节点插入
        for (Node<E> t = tail, p = t;;) {
     
            Node<E> q = p.next;
     
            //如果q=null说明p是尾节点则插入
            if (q == null) {
     
                //cas插入(1)
                if (p.casNext(null, newNode)) {
                    //cas成功说明新增节点已经被放入链表,然后设置当前尾节点(包含head,1,3,5.。。个节点为尾节点)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)//(2)
                //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
                //重新找新的head,因为新的head后面的节点才是激活的节点
                p = (t != (t = tail)) ? t : head;
            else
                // 寻找尾节点(3)
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

    从构造函数知道一开始有个item为null的哨兵节点,并且head和tail都是指向这个节点,然后当一个线程调用offer时候首先

    如图首先查找尾节点,q==null,p就是尾节点,所以执行p.casNext通过cas设置p的next为新增节点,这时候p==t所以不重新设置尾节点为当前新节点。由于多线程可以调用offer方法,所以可能两个线程同时执行到了(1)进行cas,那么只有一个会成功(假如线程1成功了),成功后的链表为:

    失败的线程会循环一次这时候指针为:

    这时候会执行(3)所以p=q,然后在循环后指针位置为:

    所以没有其他线程干扰的情况下会执行(1)执行cas把新增节点插入到尾部,没有干扰的情况下线程2 cas会成功,然后去更新尾节点tail,由于p!=t所以更新。这时候链表和指针为:

    假如线程2cas时候线程3也在执行,那么线程3会失败,循环一次后,线程3的节点状态为:

    这时候p!=t ;并且t的原始值为told,t的新值为tnew ,所以told!=tnew,所以 p=tnew=tail;

    然后在循环一下后节点状态:

    q==null所以执行(1)。

    现在就差p==q这个分支还没有走,这个要在执行poll操作后才会出现这个情况。poll后会存在下面的状态

    这个时候添加元素时候指针分布为:

    所以会执行(2)分支 结果 p=head
    然后循环,循环后指针分布:

    所以执行(1),然后p!=t所以设置tail节点。现在分布图:

    自引用的节点会被垃圾回收掉。

    四、 add操作

    add操作是在链表末尾添加一个元素,下面看看实现原理。
    其实内部调用的还是offer

    1
    2
    3
    public boolean add(E e) {
        return offer(e);
    }

    五、poll操作

    poll操作是在链表头部获取并且移除一个元素,下面看看实现原理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public E poll() {
        restartFromHead:
     
        //死循环
        for (;;) {
     
            //死循环
            for (Node<E> h = head, p = h, q;;) {
     
                //保存当前节点值
                E item = p.item;
     
                //当前节点有值则cas变为null(1)
                if (item != null && p.casItem(item, null)) {
                    //cas成功标志当前节点以及从链表中移除
                    if (p != h) // 类似tail间隔2设置一次头节点(2)
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                //当前队列为空则返回null(3)
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                //自引用了,则重新找新的队列头节点(4)
                else if (p == q)
                    continue restartFromHead;
                else//(5)
                    p = q;
            }
        }
    }
        final void updateHead(Node<E> h, Node<E> p) {
            if (h != p && casHead(h, p))
                h.lazySetNext(h);
        }

    当队列为空时候:

    可知执行(3)这时候有两种情况,第一没有其他线程添加元素时候(3)结果为true然后因为h!=p为false所以直接返回null。第二在执行q=p.next前,其他线程已经添加了一个元素到队列,这时候(3)返回false,然后执行(5)p=q,然后循环后节点分布:

    这时候执行(1)分支,进行cas把当前节点值值为null,同时只有一个线程会成功,cas成功 标示该节点从队列中移除了,然后p!=h,调用updateHead方法,参数为h,p;h!=p所以把p变为当前链表head节点,然后h节点的next指向自己。现在状态为:

    cas失败 后 会再次循环,这时候分布图为:

    这时候执行(3)返回null.

    现在还有个分支(4)没有执行过,那么什么时候会执行那?

    这时候执行(1)分支,进行cas把当前节点值值为null,同时只有一个线程A会成功,cas成功 标示该节点从队列中移除了,然后p!=h,调用updateHead方法,假如执行updateHead前另外一个线程B开始poll这时候它p指向为原来的head节点,然后当前线程A执行updateHead这时候B线程链表状态为:

    所以会执行(4)重新跳到外层循环,获取当前head,现在状态为:

    六、peek操作

    peek操作是获取链表头部一个元素(只读取不移除),下面看看实现原理。
    代码与poll类似,只是少了castItem.并且peek操作会改变head指向,offer后head指向哨兵节点,第一次peek后head会指向第一个真的节点元素。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

    七、size操作

    获取当前队列元素个数,在并发环境下不是很有用,因为使用CAS没有加锁所以从调用size函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // 最大返回Integer.MAX_VALUE
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }
     
    //获取第一个队列元素(哨兵元素不算),没有则为null
    Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
     
    //获取当前节点的next元素,如果是自引入节点则返回真正头节点
    final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

    八、remove操作

    如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回true,否者返回false

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public boolean remove(Object o) {
     
        //查找元素为空,直接返回false
        if (o == null) return false;
        Node<E> pred = null;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
     
            //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其他元素是否有匹配的。
            if (item != null &&
                o.equals(item) &&
                p.casItem(item, null)) {
     
                //获取next元素
                Node<E> next = succ(p);
     
                //如果有前驱节点,并且next不为空则链接前驱节点到next,
                if (pred != null && next != null)
                    pred.casNext(p, next);
                return true;
            }
            pred = p;
        }
        return false;
    }

    九、contains操作

    判断队列里面是否含有指定对象,由于是遍历整个队列,所以类似size 不是那么精确,有可能调用该方法时候元素还在队列里面,但是遍历过程中才把该元素删除了,那么就会返回false.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item))
                return true;
        }
        return false;
    }

    十、开源框架中使用

    Tomcat中NioEndPoint中的每个poller里面就维护一个ConcurrentLinkedQueue<Runnable>用来作为缓冲存放任务。

    10.1 Acceptor线程

    accept线程作用是接受客户端发来的连接请求并放入到事件队列。

    看下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    protected class Acceptor extends AbstractEndpoint.Acceptor {
     
            @Override
            public void run() {
     
                int errorDelay = 0;
     
                // 一直循环直到接收到shutdown命令
                while (running) {
     
                    ...
     
                    if (!running) {
                        break;
                    }
                    state = AcceptorState.RUNNING;
     
                    try {
                        //如果达到max connections个请求则等待
                        countUpOrAwaitConnection();
     
                        SocketChannel socket = null;
                        try {
                            // 从TCP缓存获取一个完成三次握手的套接字,没有则阻塞
                            // socket
                            socket = serverSock.accept();
                        } catch (IOException ioe) {
                            ...
                        }
                        // Successful accept, reset the error delay
                        errorDelay = 0;
                       if (running && !paused) {
                            if (!setSocketOptions(socket)) {
                                countDownConnection();
                                closeSocket(socket);
                            }
                        } else {
                            countDownConnection();
                            closeSocket(socket);
                        }
                       ....
                    } catch (SocketTimeoutException sx) {
                        // Ignore: Normal condition
                    ....
                }
                state = AcceptorState.ENDED;
            }
        }
     
     protected boolean setSocketOptions(SocketChannel socket) {
            // Process the connection
            try {
                //disable blocking, APR style, we are gonna be polling it
               ...
                getPoller0().register(channel);
            } catch (Throwable t) {
               ...
                return false;
            }
            return true;
    }
     
    public void register(final NioChannel socket) {
       ...
        addEvent(r);
    }
     
    public void addEvent(Runnable event) {
        events.offer(event);
        ...
    }

    10.2 Poll线程

    poll线程作用是从事件队列里面获取事件把链接套接字加入selector,并且监听socket事件进行处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public void run() {
        while (true) {
            try {
                ...
                if (close) {
                   ...
                } else {
                    hasEvents = events();
                }
                try {
                    ...
                } catch ( NullPointerException x ) {...
                }
     
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // 遍历所有注册的channel对感兴趣的事件处理
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    KeyAttachment attachment = (KeyAttachment)sk.attachment();
     
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        attachment.access();
                        iterator.remove();
                        processKey(sk, attachment);
                    }
                }//while
     
                //process timeouts
                timeout(keyCount,hasEvents);
                if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
            } catch (OutOfMemoryError oom) {
                ...
            }
        }//while
        synchronized (this) {
            this.notifyAll();
        }
        stopLatch.countDown();
     
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public boolean events() {
                boolean result = false;
     
                //从队列获取任务并执行
                Runnable r = null;
                while ( (r = events.poll()) != null ) {
                    result = true;
                    try {
                        r.run();
                        if ( r instanceof PollerEvent ) {
                            ((PollerEvent)r).reset();
                            eventCache.offer((PollerEvent)r);
                        }
                    } catch ( Throwable x ) {
                        log.error("",x);
                    }
                }
     
                return result;
            }
     
    //如配置线程池则请求交给线程池处理。
    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); //will get reset upon next reg
            SocketProcessor sc = processorCache.poll();
            if ( sc == null ) sc = new SocketProcessor(socket,status);
            else sc.reset(socket,status);
            if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
            else sc.run();
        } catch (RejectedExecutionException rx) {
           ...
        }
        return true;
    }

    十一、有意思的问题

    10.1 一个判断的执行结果分析

    offer中有个 判断 t != (t = tail)假如 t=node1;tail=node2;并且node1!=node2那么这个判断是true还是false那,答案是true,这个判断是看当前t是不是和tail相等,相等则返回true否者为false,但是无论结果是啥执行后t的值都是tail。

    下面从字节码来分析下为啥?

    • 一个例子
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static void main(String[] args)  {
     
        int t = 2;
        int tail = 3;
     
        System.out.println(t != (t = tail));
     
     
    }

    结果为:true;

    • 字节码文件:

    字节码命令介绍可参考: http://blog.csdn.net/web_code/article/details/12164733

    一开始栈为空

    • 第0行指令作用是把值2入栈栈顶元素为2

    • 第1行指令作用是将栈顶int类型值保存到局部变量t中。

    • 第2行指令作用是把值3入栈栈顶元素为3

    • 第3行指令作用是将栈顶int类型值保存到局部变量tail中。

    • 第4调用打印命令
    • 第7行指令作用是把变量t中的值入栈

    • 第8行指令作用是把变量tail中的值入栈

    • 现在栈里面元素为3,2并且3位栈顶
    • 第9行指令作用是当前栈顶元素入栈,所以现在栈内容3,3,2

    • 第10行指令作用是把栈顶元素存放到t,现在栈内容3,2

    • 第11行指令作用是判断栈顶两个元素值,相等则跳转 18。由于现在栈顶严肃为3,2不相等所以返回true.
    • 第14行指令作用是把1入栈。

    然后回头分析下!=是双目运算符,应该是首先把左边的操作数入栈,然后在去计算了右侧操作数。

    10.2 Node的构造函数

    另外对于每个节点Node在构造时候使用UnSafe.putObject设置item替代了直接对volatile的赋值,这个是为了性能考虑?为啥不直接赋值那,看看类注解怎么说:

    1
    2
    3
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    When constructing a Node (before enqueuing it) we avoid paying for a volatile write to item by using Unsafe.putObject instead of a normal write. This allows the cost of enqueue to be”one-and-a-half”
    CASes.

    也就是说当构造Node节点时候(这时候节点还没有放入队列链表)为了避免正常的写volatile变量的开销 使用了Unsafe.putObject代替。这使元素进队列仅仅花费1.5个cas操作的耗时。这个是说使用Unsafe.putObject比直接给volatile变量赋值更高效?目前还没有查到相关资料。

    十二、总结

    ConcurrentLinkedQueue使用CAS非阻塞算法实现使用CAS解决了当前节点与next节点之间的安全链接和对当前节点值的赋值。由于使用CAS没有使用锁,所以获取size的时候有可能进行offer,poll或者remove操作,导致获取的元素个数不精确,所以在并发情况下size函数不是很有用。另外第一次peek或者first时候会把head指向第一个真正的队列元素。

    下面总结下如何实现线程安全的,可知入队出队函数都是操作volatile变量:head,tail。所以要保证队列线程安全只需要保证对这两个Node操作的可见性和原子性,由于volatile本身保证可见性,所以只需要看下多线程下如果保证对着两个变量操作的原子性(CAS)

    对于offer操作是在tail后面添加元素,也就是调用tail.casNext方法,而这个方法是使用的CAS操作,只有一个线程会成功,然后失败的线程会循环一下,重新获取tail,然后执行casNext方法。对于poll也是这样的。


  • 相关阅读:
    部署 AppGlobalResources 到 SharePoint 2010
    还原一个已删除的网站集
    使用仪表板设计器配置级联筛选器 (SharePoint Server 2010 SP1)
    File or arguments not valid for site template
    Pex and Moles Documentation
    Content Query Webpart匿名访问
    Running Moles using NUnit Console from Visual Studio
    Calling a WCF Service using jQuery in SharePoint the correct way
    Updating Content Types and Site Columns That Were Deployed as a Feature
    asp.net中判断传过来的字符串不为空的代码
  • 原文地址:https://www.cnblogs.com/silyvin/p/9106630.html
Copyright © 2011-2022 走看看