zoukankan      html  css  js  c++  java
  • 读Lock-Free论文实践

    论文地址:implementing Lock-Free Queue

    论文大体讲的意思是:Lock-Base的程序的performance不好,并且a process inside the critical section can delay all operations inde nitely;所以基于以上的弊端,他们提出了Non-Blocking的算法,也就是CSW和FAA,当然就是CAS,而CAS也有最难以handler的情况,也就是ABA问题,他们给出了solution,也就是检查引用;他们分别给出了链表场景和数组场景的algorithm,最后是性能分析。

    这篇笔记主要为了给Lock-Free提供一些实现方法和思路。

    我们先看链表的情况:

    双端链表,入队Enqueue方法是tail移动,出队Dequeue是Head移动,下面记录一下伪代码。

    Enqueue(x)
    q new record
    q^:value x
    q^:next NULL
    repeat
    p tail
    succ Compare&Swap(p^:next, NULL, q)
    if succ 6= TRUE
    Compare&Swap(tail ; p; p^:next)
    until succ = TRUE
    Compare&Swap(tail ; p; q)
    end
    Dequeue() repeat p head
    if p^:next = NULL error queue empty until Compare&Swap(head ; p; p^:next) return p^:next^:value end

    Enqueue的思路是,先设置tail的next指针,如果成功,则把tail指针移动;Dequeue的思路是,如果head的copy p的next不为空,则进行移动,并在成功之后返回之前p的next的值,Java没有指针操作,只有使用unSafe类进行内存操作。(数组实现要比链表实现稳定多了)在跑线程的时候,add会有几率出现Runnable的情况,目前还不知道什么原因,最初的原因是把变量赋值写在for(;;)里,导致它一直不取值,只做循环体,将赋值写在循环体里好了一些,但还是会出现死循环问题。

    import sun.misc.Unsafe;
    
    import java.lang.reflect.Field;
    
    /**
     * Created by MacBook on 2019/4/14.
     */
    public class MyLockFreeLinkQueue<E> implements MyQueue<E>{
    
        Node<E> head;
        Node<E> tail;
    
        static Unsafe unsafe;
    
        private static final long headOffset;
        private static final long tailOffset;
        private static final long nextOffset;
    
    
        static{
            try{
                Field singleoneInstanceField = Unsafe.class.getDeclaredField("theUnsafe");
    
                singleoneInstanceField.setAccessible(true);
    
                unsafe = (Unsafe)singleoneInstanceField.get(null);
    
                headOffset = unsafe.objectFieldOffset
                        (MyLockFreeLinkQueue.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                        (MyLockFreeLinkQueue.class.getDeclaredField("tail"));
                nextOffset = unsafe.objectFieldOffset
                        (MyLockFreeLinkQueue.Node.class.getDeclaredField("next"));
            }catch (Exception e){
                throw new Error(e);
            }
        }
    
    
        static class Node<E>{
            E data;
            Node<E> next;
    
            public Node(E data, Node<E> next) {
                this.data = data;
                this.next = next;
            }
    
            public E getData() {
                return data;
            }
    
            public void setData(E data) {
                this.data = data;
            }
    
            public Node<E> getNext() {
                return next;
            }
    
            public void setNext(Node<E> next) {
                this.next = next;
            }
        }
    
    
        public MyLockFreeLinkQueue() {
            head = tail = new Node<>(null,null);
        }
    
        /**
         *
         Enqueue(x)
         q new record
         q^:value x
         q^:next NULL
         repeat
         p tail
         succ Compare&Swap(p^:next, NULL, q)
         if succ 6= TRUE
         Compare&Swap(tail ; p; p^:next)
         until succ = TRUE
         Compare&Swap(tail ; p; q)
         end
         * @param e
         * @return
         */
        @Override
        public boolean add(E e) {
            Node<E> q = new Node<>(e,null);
            for(;;){
                Node<E> p = tail;
                if(unsafe.compareAndSwapObject(p,nextOffset,null,q)){
                    while(unsafe.compareAndSwapObject(this,tailOffset,p,q));
                    break;
                }
            }
            return true;
        }
    
    
        /**
         *
         Dequeue()
         repeat
         p head
         if p^:next = NULL
         error queue empty
         until Compare&Swap(head ; p; p^:next)
         return p^:next^:value
         end
         * @return
         */
        @Override
        public E take() {
            for(;;){
                Node<E> p = head,next = p.getNext();
                if(next == null){
                    return null;
                }else if(unsafe.compareAndSwapObject(this,headOffset,p,next)){
                    p.setNext(null);// help gc
                    return next.getData();
                }
            }
        }
    
    }

     

    这里我跑了2个生产者线程,2个消费者线程,数据有序的被消费了。

    ...
    pool-1-thread-3 send [62] to queue; total 193 pool-1-thread-3 send [97] to queue; total 194 pool-1-thread-3 send [25] to queue; total 195 pool-1-thread-3 send [17] to queue; total 196 pool-1-thread-3 send [50] to queue; total 197 pool-1-thread-3 send [72] to queue; total 198 pool-1-thread-3 send [46] to queue; total 199 pool-1-thread-3 send [83] to queue; total 200 pool-1-thread-4 consumer [62],count 2n+1 result :125; total 193 pool-1-thread-4 consumer [97],count 2n+1 result :195; total 194 pool-1-thread-4 consumer [25],count 2n+1 result :51; total 195 pool-1-thread-4 consumer [17],count 2n+1 result :35; total 196 pool-1-thread-4 consumer [50],count 2n+1 result :101; total 197 pool-1-thread-4 consumer [72],count 2n+1 result :145; total 198 pool-1-thread-4 consumer [46],count 2n+1 result :93; total 199 pool-1-thread-4 consumer [83],count 2n+1 result :167; total 200

    接下来,我实践了环形数组的实现,基于我之前实现的BlockingQueue,这个Lock-Free Queue会变得比较简单。

    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by MacBook on 2019/4/13.
     */
    public class MyLockFreeQueue<E> implements MyQueue<E>{
        private Object[] data;
        private AtomicInteger takeIndex;
        private AtomicInteger putIndex;
        private AtomicInteger size;
        private static final int DEFAULT_CAPACITY = 10;
    
        public MyLockFreeQueue (){
            this(DEFAULT_CAPACITY);
        }
        public MyLockFreeQueue(int initCapacity){
            if(initCapacity < 0){
                throw new IllegalStateException("initCapacity must not be negative");
            }
            data = new Object[initCapacity];
            takeIndex = new AtomicInteger(0);
            putIndex = new AtomicInteger(0);
            size = new AtomicInteger(0);
        }
    
        public boolean add(E e){
            if(e == null){
                throw new NullPointerException("the element you put can't be null");
            }
            for(int index = putIndex.get();;){
                if(size.get() == data.length){
                    return false;
                }
                int expect = (index == data.length - 1)?0:(index+1);
                if(putIndex.compareAndSet(index,expect)){
                    data[index] = e;
                    size.incrementAndGet();
                    return true;
                }
            }
        }
        public E take(){
            for(int index = takeIndex.get();;){
                if(size.get() == 0){
                    return null;
                }
                int expect = (index == data.length - 1)?0:(index+1);
                E e = (E)data[index];
                if(takeIndex.compareAndSet(index,expect)){
                    size.decrementAndGet();
                    return e;
                }
            }
        }
    }

    思路就是,使用两个标记入队和出队的Atom Integer对象,在成功申请当前格子之后,给当前格子赋值,使用size来判断是否EMPTY和FULL。这里依然有一点缺陷,就是index和size不同步的问题,不过我也是跑了2+2线程,也是有序消费了。

    ...pool-1-thread-3 send [81] to queue; total 188
    pool-1-thread-2 consumer [81],count 2n+1 result :163; total 188
    pool-1-thread-3 send [1] to queue; total 189
    pool-1-thread-2 consumer [1],count 2n+1 result :3; total 189
    pool-1-thread-2 consumer [19],count 2n+1 result :39; total 190
    pool-1-thread-3 send [19] to queue; total 190
    pool-1-thread-3 send [61] to queue; total 191
    pool-1-thread-2 consumer [61],count 2n+1 result :123; total 191
    pool-1-thread-3 send [16] to queue; total 192
    pool-1-thread-2 consumer [16],count 2n+1 result :33; total 192
    pool-1-thread-3 send [74] to queue; total 193
    pool-1-thread-2 consumer [74],count 2n+1 result :149; total 193
    pool-1-thread-3 send [38] to queue; total 194
    pool-1-thread-2 consumer [38],count 2n+1 result :77; total 194
    pool-1-thread-3 send [32] to queue; total 195
    pool-1-thread-2 consumer [32],count 2n+1 result :65; total 195
    pool-1-thread-3 send [9] to queue; total 196
    pool-1-thread-2 consumer [9],count 2n+1 result :19; total 196
    pool-1-thread-3 send [77] to queue; total 197
    pool-1-thread-2 consumer [77],count 2n+1 result :155; total 197
    pool-1-thread-3 send [69] to queue; total 198
    pool-1-thread-2 consumer [69],count 2n+1 result :139; total 198
    pool-1-thread-3 send [52] to queue; total 199
    pool-1-thread-2 consumer [52],count 2n+1 result :105; total 199
    pool-1-thread-3 send [81] to queue; total 200
    pool-1-thread-2 consumer [81],count 2n+1 result :163; total 200
    
            ExecutorService executor = Executors.newFixedThreadPool(6);
            MyLockFreeQueue<Integer> queue = new MyLockFreeQueue();
            Worker<Integer> pro = new Provider(queue);
            Worker<Integer> con = new Consumer(queue);
    
            executor.submit(pro);
            executor.submit(con);
            executor.submit(pro);
            executor.submit(con);
            executor.submit(pro);
            executor.submit(con);
            
            executor.shutdown();
  • 相关阅读:
    Quartz快速入门 (转自 http://www.blogjava.net/baoyaer/articles/155645.html)
    C#HTML 转文本及HTML内容提取
    创建连接字符串方法
    源生js _AJax
    .NET里面附件上传大小限制的控制
    错误记录 COM
    Quartz的cron表达式
    Quartz 多任务调用
    Myeclipse的web工程和Eclipse互相转换
    Myeclipse8.0序列号生成程序
  • 原文地址:https://www.cnblogs.com/chentingk/p/10705141.html
Copyright © 2011-2022 走看看