zoukankan      html  css  js  c++  java
  • JAVA 并发:CLH 锁 与 AbstractQueuedSynchronizer

    首先向Doug Lea致敬。

    CLH

    以下是CLH锁的一个简单实现:

    class SimpleCLHLock {
        /**
         * initialized with a dummy node
         */
        private Node dummy = new Node();
        private AtomicReference<Node> tail = new AtomicReference<Node>(dummy);
    
        /**
         * implicit single linked list node
         */
        private static class Node {
            public volatile boolean locked = false;
        }
    
        private ThreadLocal<Node> threadLockNode = new ThreadLocal<Node>();
    
        public void lock() {
            Node last_tail;
            Node new_tail = new Node();
            new_tail.locked = true;
    
            while (true) {
                last_tail = tail.get();
    
                if (tail.compareAndSet(last_tail, new_tail)) {
                    break;
                }
            }
            // we just keep previous tail node as current node's direct predecessor
            // and waiting it jump into unlocked state, so we form a single linked list implicitly
            while (last_tail.locked) {
                Thread.yield();
            }
    
            // we need the node reference when we want to unlock
            // some threads may waiting on its state to be unlocked in above while-loop statement.
            threadLockNode.set(new_tail);
        }
    
        public void unlock() {
            // retrieve the lock state held by current thread
            Node node = threadLockNode.get();
            // if null then the current thread has not own the lock, also can't perform unlock operation
            if (node == null) {
                throw new IllegalMonitorStateException();
            }
            node.locked = false;
            threadLockNode.set(null);
        }
    }
    

    测试了不同的同步方式的速度

    package lock;
    
    import java.util.List;
    
    import java.util.ArrayList;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Created by hegaofeng on 6/30/15.
     */
    public class CLH {
        private static volatile long count;
    
        private static AtomicLong atomicLong = new AtomicLong();
    
        private static ExecutorService pool = Executors.newCachedThreadPool();
    
        public static void main(String[] args) {
    
    
            final int deltaPerThreads = 10000;
            final int numsOfThreads = 10;
    
            Callable<Boolean> raw_counter = new Callable<Boolean>() {
                @Override
                public Boolean call() {
                    for (int i=0; i<deltaPerThreads; i++) count++;
                    return true;
                }
            };
    
            Callable<Boolean> syn_counter = new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                        for (int i=0; i<deltaPerThreads; i++) {
                            synchronized (CLH.class) {
                                count++;
                            }
                        }
    
                    return true;
                }
            };
    
            Callable<Boolean> clh_counter = new Callable<Boolean>() {
                private SimpleCLHLock lock = new SimpleCLHLock();
                @Override
                public Boolean call() throws Exception {
                    for (int i=0; i<deltaPerThreads; i++) {
                        lock.lock();
                            count++;
                        lock.unlock();
                    }
                    return true;
                }
            };
    
            final Callable<Boolean> lck_counter = new Callable<Boolean>() {
                private ReentrantLock lock = new ReentrantLock();
                @Override
                public Boolean call() throws Exception {
                    for (int i=0; i<deltaPerThreads; i++) {
                        lock.lock();
                            count++;
                        lock.unlock();
                    }
                    return true;
                }
            };
    
            final Callable<Boolean> lck_fair_counter = new Callable<Boolean>() {
                private ReentrantLock lock = new ReentrantLock(true);
                @Override
                public Boolean call() throws Exception {
                    for (int i=0; i<deltaPerThreads; i++) {
                        lock.lock();
                        count++;
                        lock.unlock();
                    }
                    return true;
                }
            };
    
            Callable<Boolean> sem_counter = new Callable<Boolean>() {
                private Semaphore sem = new Semaphore(1);
                @Override
                public Boolean call() throws Exception {
                    for(int i=0; i<deltaPerThreads; i++) {
                        sem.acquire();
                        count++;
                        sem.release();
                    }
                    return true;
                }
            };
    
            Callable<Boolean> sem_fair_counter = new Callable<Boolean>() {
                private Semaphore sem = new Semaphore(1, true);
                @Override
                public Boolean call() throws Exception {
                    for(int i=0; i<deltaPerThreads; i++) {
                        sem.acquire();
                        count++;
                        sem.release();
                    }
                    return true;
                }
            };
    
            Callable<Boolean> cas_counter = new Callable<Boolean>() {
                private AtomicBoolean locked = new AtomicBoolean();
                @Override
                public Boolean call() throws Exception {
                    for (int i=0; i<deltaPerThreads; i++) {
                        while (true) {
                            if (locked.compareAndSet(false, true)) {
                                count++;
                                locked.getAndSet(false);
                                break;
                            }
                        }
                    }
                    return true;
                }
            };
    
            Callable<Boolean> pure_cas = new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    for (int i=0; i<deltaPerThreads; i++) {
                        atomicLong.incrementAndGet();
                    }
                    return true;
                }
            };
    
            test("raw", raw_counter, deltaPerThreads, numsOfThreads);
    
            test("syn", syn_counter, deltaPerThreads, numsOfThreads);
    
            test("CLH", clh_counter, deltaPerThreads, numsOfThreads);
    
            test("lock", lck_counter, deltaPerThreads, numsOfThreads);
    
            test("lock_fair", lck_fair_counter, deltaPerThreads, numsOfThreads);
    
            test("sem", sem_counter, deltaPerThreads, numsOfThreads);
    
            test("sem_fair", sem_fair_counter, deltaPerThreads, numsOfThreads);
    
            test("CAS", cas_counter, deltaPerThreads, numsOfThreads);
    
            test("pure_CAS", pure_cas, deltaPerThreads, numsOfThreads);
        }
    
    
        public static void test(String name, Callable<Boolean> callable, final int deltaPerThreads, final int numsOfThreads) {
            count = 0;
            atomicLong.set(0);
            pool = Executors.newCachedThreadPool();
    
            List<Callable<Boolean>> calls = new ArrayList<Callable<Boolean>>();
            for (int i=0; i<numsOfThreads; i++) {
                calls.add(callable);
            }
            long start = System.nanoTime();
            try {
                pool.invokeAll(calls);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            pool.shutdown();
            long end = System.nanoTime();
            System.out.println("case   : " + name);
            System.out.println("counted: " + (name.equals("pure_CAS") ? atomicLong.get() : count));
            System.out.println("target : " + numsOfThreads * deltaPerThreads);
            System.out.println("time   : " + (end - start) / 1e6 + "ms");
            System.out.println();
        }
    }
    

    结果:

    case   : raw
    counted: 79270
    target : 100000
    time   : 11.278ms
    
    case   : syn
    counted: 100000
    target : 100000
    time   : 13.553ms
    
    case   : CLH
    counted: 100000
    target : 100000
    time   : 1037.665ms
    
    case   : lock
    counted: 100000
    target : 100000
    time   : 29.134ms
    
    case   : lock_fair
    counted: 100000
    target : 100000
    time   : 638.944ms
    
    case   : sem
    counted: 100000
    target : 100000
    time   : 45.154ms
    
    case   : sem_fair
    counted: 100000
    target : 100000
    time   : 552.855ms
    
    case   : CAS
    counted: 100000
    target : 100000
    time   : 158.422ms
    
    case   : pure_CAS
    counted: 100000
    target : 100000
    time   : 31.964ms
    
    
    • raw 表示不进行任何同步,现在没有保护的情况下计数累加产生了错误
    • syn 表示使用内置锁同步方式即使用synchronized块,在几种方式里表现最好
    • CLH 表示使用自己实现的一个CLH锁(具有排队功能的自旋锁),这里的CLH的实现方式决定了它是一个公平锁
    • lock 表示用ReentrantLock进行数据保护,速度仅次于内置锁,lock_fair是它的公平版本,不过速度上有大幅下降,变慢了将近20倍
    • sem 表示使用Semaphore也就是信号量进行数据保护,速度也不错,sem_fair是它的公平版本,和ReentrantLock上出现的情况一样,公平版本比非公平版本出现了大幅的速度下降,慢了10倍
    • CAS 用CAS操作实现简单的自旋锁,不具有排队功能
    • pure_CAS 表示直接用使用AtomicLong类型的count变量进行计数,就不需要锁保护了,速度也是非常快得

    调整测试的参数将线程数改为2,每线程增量改为50000即总增量不变的情况下得到的结果如下:

    
    case   : raw
    counted: 91460
    target : 100000
    time   : 4.767ms
    
    case   : syn
    counted: 100000
    target : 100000
    time   : 12.385ms
    
    case   : CLH
    counted: 100000
    target : 100000
    time   : 86.12ms
    
    case   : lock
    counted: 100000
    target : 100000
    time   : 33.91ms
    
    case   : lock_fair
    counted: 100000
    target : 100000
    time   : 108.119ms
    
    case   : sem
    counted: 100000
    target : 100000
    time   : 62.503ms
    
    case   : sem_fair
    counted: 100000
    target : 100000
    time   : 83.035ms
    
    case   : CAS
    counted: 100000
    target : 100000
    time   : 13.528ms
    
    case   : pure_CAS
    counted: 100000
    target : 100000
    time   : 11.377ms
    

    基本锁

    ReentrantLock的非公平版本在两次测试中并没有很大的变化,比较稳定。

    公平锁

    在不必要的情况下不去使用锁或者信号量的公平版本,它们相比非公平版本要慢很多,尤其当竞争非常激烈时。从第二组数据来看公平锁与非公平锁的差距缩小了很多,因为在竞争程度比较低(线程数少)的时候花在维护队列上的时间将大大减少。

    CAS

    正如Java并发编程中提到的CAS在高竞争的环境下性能不如直接使用锁,因为当竞争非常激烈时CAS花在重试上的时间将会大量增加(因为竞争激烈变量变化越快,CAS操作失败可能性越大)。而在竞争不是非常激烈的情况下CAS的开销更小,比如在第二次此时中的CAS自旋锁时间就比直接用ReentrantLock来得少。

    内置锁

    虽然说在Java并发编程一书中提到JDK1.6的Lock实现比synchronized要略快,这在JDK1.7中恐怕已经不成立了。

    AbstractQueuedSynchronizer

    http://javarticles.com/2012/10/abstractqueuedsynchronizer-aqs.html#prettyPhoto

  • 相关阅读:
    c/c++面试45-50之字符串
    c/c++面试39-44之内存动态分配
    使用spring配合Junit进行单元测试的总结
    使用springBoot进行快速开发
    配置项目使用weblogic的JNDI数据源
    转载-解决使用httpClient 4.3.x登陆 https时的证书报错问题
    SpringData JPA查询分页demo
    Lucene中的域选项
    代码片段,lucene基本操作(基于lucene4.10.2)
    配置maven使用nexus
  • 原文地址:https://www.cnblogs.com/lailailai/p/4609275.html
Copyright © 2011-2022 走看看