zoukankan      html  css  js  c++  java
  • Java 并发实践 — ConcurrentHashMap 与 CAS

    转载 http://www.importnew.com/26035.html

    最近在做接口限流时涉及到了一个有意思问题,牵扯出了关于concurrentHashMap的一些用法,以及CAS的一些概念。限流算法很多,我主要就以最简单的计数器法来做引。先抽象化一下需求:统计每个接口访问的次数。一个接口对应一个url,也就是一个字符串,每调用一次对其进行加一处理。可能出现的问题主要有三个:

    1. 多线程访问,需要选择合适的并发容器
    2. 分布式下多个实例统计接口流量需要共享内存
    3. 流量统计应该尽可能不损耗服务器性能

    但这次的博客并不是想描述怎么去实现接口限流,而是主要想描述一下遇到的问题,所以,第二点暂时不考虑,即不使用Redis。

    说到并发的字符串统计,立即让人联想到的数据结构便是ConcurrentHashpMap<String,Long> urlCounter;
    如果你刚刚接触并发可能会写出如代码清单1的代码

    代码清单1:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public class CounterDemo1 {
     
        private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
     
        //接口调用次数+1
        public long increase(String url) {
            Long oldValue = urlCounter.get(url);
            Long newValue = (oldValue == null) ? 1L : oldValue + 1;
            urlCounter.put(url, newValue);
            return newValue;
        }
     
        //获取调用次数
        public Long getCount(String url){
            return urlCounter.get(url);
        }
     
        public static void main(String[] args) {
            ExecutorService executor = Executors.newFixedThreadPool(10);
            final CounterDemo1 counterDemo = new CounterDemo1();
            int callTime = 100000;
            final String url = "http://localhost:8080/hello";
            CountDownLatch countDownLatch = new CountDownLatch(callTime);
            //模拟并发情况下的接口调用统计
            for(int i=0;i<callTime;i++){
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        counterDemo.increase(url);
                        countDownLatch.countDown();
                    }
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executor.shutdown();
            //等待所有线程统计完成后输出调用次数
            System.out.println("调用次数:"+counterDemo.getCount(url));
        }
    }
     
    console output:
    调用次数:96526

    都说concurrentHashMap是个线程安全的并发容器,所以没有显示加同步,实际效果呢并不如所愿。

    问题就出在increase方法,concurrentHashMap能保证的是每一个操作(put,get,delete…)本身是线程安全的,但是我们的increase方法,对concurrentHashMap的操作是一个组合,先get再put,所以多个线程的操作出现了覆盖。如果对整个increase方法加锁,那么又违背了我们使用并发容器的初衷,因为锁的开销很大。我们有没有方法改善统计方法呢?
    代码清单2罗列了concurrentHashMap父接口concurrentMap的一个非常有用但是又常常被忽略的方法。

    代码清单2:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
     * Replaces the entry for a key only if currently mapped to a given value.
     * This is equivalent to
     *  <pre> {@code
     * if (map.containsKey(key) && Objects.equals(map.get(key), oldValue)) {
     *   map.put(key, newValue);
     *   return true;
     * } else
     *   return false;
     * }</pre>
     *
     * except that the action is performed atomically.
     */
    boolean replace(K key, V oldValue, V newValue);

    这其实就是一个最典型的CAS操作,except that the action is performed atomically.这句话真是帮了大忙,我们可以保证比较和设置是一个原子操作,当A线程尝试在increase时,旧值被修改的话就回导致replace失效,而我们只需要用一个循环,不断获取最新值,直到成功replace一次,即可完成统计。

    改进后的increase方法如下

    代码清单3:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public long increase2(String url) {
            Long oldValue, newValue;
            while (true) {
                oldValue = urlCounter.get(url);
                if (oldValue == null) {
                    newValue = 1l;
                    //初始化成功,退出循环
                    if (urlCounter.putIfAbsent(url, 1l) == null)
                        break;
                    //如果初始化失败,说明其他线程已经初始化过了
                } else {
                    newValue = oldValue + 1;
                    //+1成功,退出循环
                    if (urlCounter.replace(url, oldValue, newValue))
                        break;
                    //如果+1失败,说明其他线程已经修改过了旧值
                }
            }
            return newValue;
        }
     
    console output:
    调用次数:100000

    再次调用后获得了正确的结果,上述方案看上去比较繁琐,因为第一次调用时需要进行一次初始化,所以多了一个判断,也用到了另一个CAS操作putIfAbsent,他的源代码描述如下:

    代码清单4:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
         * If the specified key is not already associated
         * with a value, associate it with the given value.
         * This is equivalent to
         *  <pre> {@code
         * if (!map.containsKey(key))
         *   return map.put(key, value);
         * else
         *   return map.get(key);
         * }</pre>
         *
         * except that the action is performed atomically.
         *
         * @implNote This implementation intentionally re-abstracts the
         * inappropriate default provided in {@code Map}.
         *
         * @param key key with which the specified value is to be associated
         * @param value value to be associated with the specified key
         * @return the previous value associated with the specified key, or
         *         {@code null} if there was no mapping for the key.
         *         (A {@code null} return can also indicate that the map
         *         previously associated {@code null} with the key,
         *         if the implementation supports null values.)
         * @throws UnsupportedOperationException if the {@code put} operation
         *         is not supported by this map
         * @throws ClassCastException if the class of the specified key or value
         *         prevents it from being stored in this map
         * @throws NullPointerException if the specified key or value is null,
         *         and this map does not permit null keys or values
         * @throws IllegalArgumentException if some property of the specified key
         *         or value prevents it from being stored in this map
         */
         V putIfAbsent(K key, V value);

    简单翻译如下:“如果(调用该方法时)key-value 已经存在,则返回那个 value 值。如果调用时 map 里没有找到 key 的 mapping,返回一个 null 值”。值得注意点的一点就是concurrentHashMap的value是不能存在null值的。实际上呢,上述的方案也可以把Long替换成AtomicLong,可以简化实现, ConcurrentHashMap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private AtomicLongMap<String> urlCounter3 = AtomicLongMap.create();
     
    public long increase3(String url) {
        long newValue = urlCounter3.incrementAndGet(url);
        return newValue;
    }
     
     
    public Long getCount3(String url) {
        return urlCounter3.get(url);
    }

    看一下他的源码就会发现,其实和代码清单3思路差不多,只不过功能更完善了一点。

    和CAS很像的操作,我之前的博客中提到过数据库的乐观锁,用version字段来进行并发控制,其实也是一种compare and swap的思想。

  • 相关阅读:
    faster with MyISAM tables than with InnoDB or NDB tables
    w-BIG TABLE 1-toSMALLtable @-toMEMORY
    Indexing and Hashing
    MEMORY Storage Engine MEMORY Tables TEMPORARY TABLE max_heap_table_size
    controlling the variance of request response times and not just worrying about maximizing queries per second
    Variance
    Population Mean
    12.162s 1805.867s
    situations where MyISAM will be faster than InnoDB
    1920.154s 0.309s 30817
  • 原文地址:https://www.cnblogs.com/miracle77hp/p/10306467.html
Copyright © 2011-2022 走看看