zoukankan      html  css  js  c++  java
  • Java笔记(十六)并发容器

    并发容器

    一、写时复制的List和Set

    CopyOnWrite即写时复制,或称写时拷贝,是解决并发问题的一种重要思路。

    一)CopyOnWriteArrayList

    该类实现了List接口,它的用法与其他List基本一样。其特点如下:

    1)它是线程安全的

    2)它的迭代器不支持修改操作,但也不会抛出ConcurrentModificationException

    3)它以原子方式支持一些复合操作,该类支持的两个原子方法:

    //不存在才添加,添加成功返回true,否则返回false
    public boolean addIfAbsent(E e)
    //批量添加c集合中的非重复元素,不存在才添加,返回实际添加个数
    public int addAllAbsent(Collection<? extends E> c)

    CopyOnWriteArrayList的内部也是一个数组,但这个数组是以原子方式被整体更新的。

    每次修改操作,都会新键一个数组,复制原数组的内容到新数组,在新数组上进行需要

    的修改,然后以原子的方式设置内部数组的引用,这就是写时复制。

    所有的读操作,都是先拿到当前引用的数组,然后直接访问该数组,在读的过程中可能

    内部数组的引用已经被修改,但这不会影响读操作,它依旧访问原数组内容。

    换句话说,数组内容都是只读的,写操作都是通过新键数组,然后原子性地修改数组

    引用来实现的。内部数组声明为:

    //声明为volatile保证内存可见性
    private volatile transient Object[] array;

    访问/设置该数组的方法:

    final Object[] getArray() {
        return array;
    }
    final void setArray(Object[] a) {
        array = a;
    }

    读不需要锁,可以并行,但写需要锁。CopyOnWriteArrayList内部使用ReentrantLock:

    transient final ReentrantLock lock = new ReentrantLock();

    默认构造方法:

    public CopyOnWriteArrayList() {
        setArray(new Object[0]);
    }
    //add方法:
    public
    boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }

    每次修改都要创建一个新的数组,然后复制所有的内容,这听上去是一个难以让人接受的方案,

    如果数组较大,修改操作又比较频繁,可以想象,CopyOnWriteArrayList性能是很低的。事实

    确实如此,CopyOnWriteArrayList不适用于数组很大且修改频繁的场景。它是以优化读为目标

    的,读不需要同步性能很高,但这牺牲了写的性能。

    之前介绍了保证线程安全的两种思路:一种是锁,使用synchronized或ReentrantLock;

    另一种是循环CAS。写时复制是不同与这两种的另一种思路:写时复制通过复制资源减少

    冲突。对于读远远多于写的场景,是一种很好的解决方案。

    写时复制是一种重要的思维,用于各种计算机程序中,比如操作系统内部的进程管理和内存管理。

    二)CopyOnWriteSet

    该类内部是通过CopyOnWriteArrayList实现的:

    private final CopyOnWriteArrayList<E> al;

    在构造方法中被初始化:

    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }

    其add方法为:

    public boolean add(E e) {
        return al.addIfAbsent(e);
    }

    其适用场景类似于CopyOnWriteArrayList

    二、ConcurrentHashMap

    与HashMap相比,它有如下特点:

    1)并发安全

    2)直接支持一些原子复合操作

    3)支持高并发,读操作完全并行,写操作支持一定程度并行

    4)迭代不用加锁,不会抛出异常

    5)弱一致性

    一)并发安全

    HashMap不是并发安全的,在并发更新的情况下,HashMap可能会出现死循环,占满CPU。

    public static void unsafeConcurrentUpdate() {
        final Map<Integer, Integer> map = new HashMap<Integer, Integer>();
        for(int i = 0; i < 1000; i++) {
            Thread t = new Thread() {
                Random rnd = new Random();
                @Override
                public void run() {
                    for(int i = 0; i < 1000; i++) {
                        map.put(rnd.nextInt(), 1);
                    }
                }
            };
            t.start();
        }
    }

    解决办法:使用ConcurrentHashMap

    二)原子复合操作

    除了map接口,ConcurrentHashMap还实现了一个接口ConcurrentMap接口:

    public interface ConcurrentMap<K, V> extends Map<K, V> {
        //条件更新,如果没有key,更新。
        V putIfAbsent(K key, V value);
        //条件删除:如果map中有key且对应的值为value,则删除,删除成功返回true.
        boolean remove(Object key, Object value);
        //条件替换
        boolean replace(K key, V oldValue, V newValue);
        //条件替换
        V replace(K key, V value);
    }

    三)高并发的基本机制

    在Java7中,主要使用了

    1.分段锁 

    将数据分为多个段,而每个段都有独立的锁。每一个段相当于一个独立的哈希表,

    分段依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值

    映射到段,再在对应的段上进行操作。

    采用分段锁技术可以大大地提高并发度,多个段之间可以并行读写。

    //concurrencyLevel表示估计更新的线程数
    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

    2.读不需要锁 

    多个读可以并行,写的同时也可以读。

    三)迭代安全

     ConcurrentHashMap,在迭代过程中,如果另一个线程对容器进行了修改,迭代会继续,不会抛异常。

    public static void test() {
        final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put("a", "apple");
        map.put("b", "banana");
        Thread t1 = new Thread() {
            @Override
            public void run() {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(entry.getKey() + " : " + entry.getValue());
                    /*a : apple
                    b : banana
                    c : cee*/
                }
            }
        };
        t1.start();
        //确保t1启动
        try {
            Thread.sleep(100);
        } catch (Exception e) {
            e.printStackTrace();
        }
        map.put("c", "cee");
    }
    public static void test() {
        final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put("a", "apple");
        map.put("b", "banana");
        Thread t1 = new Thread() {
            @Override
            public void run() {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(entry.getKey() + " : " + entry.getValue());
                    /*a : apple
                    b : banana
                    c : cee*/
                }
            }
        };
        t1.start();
        //确保t1启动
        try {
            Thread.sleep(100);
        } catch (Exception e) {
            e.printStackTrace();
        }
        map.put("c", "cee");
        //如果将语句改为;输出为:
        /*a : apple
        b : banana*/
        //这叫弱一致性
    }

    四)弱一致性

    如果在遍历的过程中,内部元素发生变化,如果变化发生在已经遍历的部分,

    迭代器就不会反应出来,变化发生在未遍历的部分,迭代器就会发现并反应

    出来,这就是弱一致性。这种情况还出现在ConcurrentHashMap另外一个方法:

    public void putAll(Map<? extends K, ? extends V> m)

    该方法并非原子操作,而是调用put方法逐个元素进行添加。

    三)基于跳表的Map和Set

    Java并发包中的TreeMap/TreeSet对应的版本是ConcurrentSkipListMap和ConcurrentSkipListSet

    一)基本概念

    ConcurrentSkipListSet是基于ConcurrentSkipListMap实现的。

    ConcurrentSkipListMap特点:

    1)没有使用锁,所以操作都是无阻塞的,所以操作都可以并行,包括写,多线程可以同时写。

    2)迭代器不会抛出异常,是弱一致性的

    3)支持一些原子复合操作

    4)可排序,默认按键的自然顺序,可传递比较器自定义。

    示例代码:

    Map<String, String> map = new ConcurrentSkipListMap<>();
    map.put("c", "call");
    map.put("a", "apple");
    map.put("b", "banana");
    System.out.println(map.toString()); //{a=apple, b=banana, c=call}

    需要说明的是ConcurrentSkipListMap的size方法,与大多数容器实现不同,这个方法不是常量操作,

    它需要遍历所有元素,复杂度为O(N),而且遍历结束后,元素个数已经改变。一般而言,在并发应用

    中,这个方法用处不大。

    二)基本实现原理

    跳表是基于链表的,在链表的基础上加了多层索引结构。

    例如,假如容器中包含如下元素:3, 6, 7, 9, 12, 17, 19, 21, 25, 26

    对于一个Map来说,这些值可以视为键。ConcurrentSkipListMap会构造类似

    下图的跳表结构:

    这个链表是有序的,但与数组不同,链表不能根据索引直接定位,不能进行

    二分查找。

    在该结构中,高层的索引节点一定同时是底层的索引节点。

    大致上第一层是基本链表的1/2,第二层是第一层的1/2。

    每个索引节点有两个指针一个向右,一个向下。

    有了这个结构,就可以实现类似二分查找了。查找元素总是从最高层

    开始,将待查值与下一个索引节点的值进行比较,如果大于索引节点

    就向右移动,继续比较,如果小于节点则移入下一层进行比较。如图

    展示了查找19和8的过程:

    这个结构是有序的,查找性能与二叉树类似。这个结构是在更新的过程中保持的,

    保存元素的基本思路是:

    1)先在基本链表找到插入位置,插入基本链表;

    2)更新索引层

    对于索引的更新:随机计算一个数,表示为该元素最高建几层的索引,一层的概率为1/2,二层为1/4,以此类推。

    然后从最高层到最低层,在每一层为该元素建立索引节点,建立索引的位置也是先查找位置再插入。

    对于删除元素,ConcurrentSkipListMap不是直接进行真正的删除,而是为了避免冲突,有一个复杂的标记过程,

    在内部遍历元素的过程中进行真正的删除。

    总结:ConcurrentSkipListMap/ConcurrentSkipListSet基于跳表实现,

    有序,无锁,非阻塞,完全并行,主要操作复杂度为0(log(N))。

    三、并发队列

    Java并发包提供了丰富的队列,可以简单地分为如下几种:

    1.无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDueue. 

    它们适用于多个线程并发使用一个队列的的场合,都是基于链表实现,都没有大小

    限制,是无解的。size方法不是一个常量运算。这两个类最基础的原理是循环CAS。

    2.普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque. 

    它们都实现了BlockingQueue接口:

    //入队,如果队列满,等待直到队列有空间
    void put(E e) throws InterruptedException;
    //出队,如果队列空,等待直到队列不为空,返回头部元素
    E take() throws InterruptedException;
    //入队,如果队列满,最多等待指定时间,如果超时还是满,返回false
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    //出队
    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    特别需要注意的是,ArrayBlockingQueue是有界的,创建时指定大小,且在运行的过程中不会改变,这与ArrayDeque不同。

    在内部它们都是通过显式锁和显式条件实现的。

    3.优先级阻塞队列:PriorityBlockingQueue. 

    基本实现原理与PriorityQueue类似,基于堆。但它实现了BlockingQueue接口,是阻塞的。

    4.延时阻塞队列:DelayQueue 

    它要求每个元素都实现Delayed接口,接口声明为:

    public interface Delayed extends Comparable<Delayed> {
        //返回一个给定的时间单位unit整数,表示再延长多长时间,小等于0表示不再延长
        long getDelay(TimeUnit unit);
    }

    该类用于实现定时任务,它按元素的延时时间出队,只有当元素的延时过期之后才能被队列拿走,

    也就是说take方法总是返回第一个过期的元素,如果没有则阻塞等待。  

    5.其他阻塞队列SynchronousQueue 和 LinkedTransferQueue 

  • 相关阅读:
    Ant安装及环境配置
    tp5 自定义公共函数,前台模板调用
    Linux下查看当前文件大小的命令
    超级好用的视频压缩工具
    ubuntu 安装 gd
    wordpress Error establishing a database connection问题
    wordpress 上传图片出现权限或者http错误
    nginx解决WordPress 上传到服务器后页面404错误的方法
    ubuntu mysql新增用户并开启远程连接
    linux每日命令(21): find命令之exec
  • 原文地址:https://www.cnblogs.com/Shadowplay/p/10106239.html
Copyright © 2011-2022 走看看