zoukankan      html  css  js  c++  java
  • 同步容器并发容器

    同步容器

    同步容器可以简单地理解为通过synchronized来实现同步的容器,如果有多个线程调用同步容器的方法,它们将会串行执行。

    同步容器将它们的状态封装起来,并对每一个公有方法进行同步。主要包括:

    • Vector
    • Stack
    • HashTable
    • Collections.synchronized方法生成,例如: 
      • Collectinons.synchronizedList()
      • Collections.synchronizedSet()
      • Collections.synchronizedMap()
      • Collections.synchronizedSortedSet()
      • Collections.synchronizedSortedMap()

    其中Vector(同步的ArrayList)和Stack(继承自Vector,先进后出)、HashTable(继承自Dictionary,实现了Map接口)是比较老的容器,Thinking in Java中明确指出,这些容器现在仍然存在于JDK中是为了向以前老版本的程序兼容,在新的程序中不应该在使用。Collections的方法时将非同步的容器包裹生成对应的同步容器。

    同步容器在单线程的环境下能够保证线程安全,但是通过synchronized同步方法将访问操作串行化,导致并发环境下效率低下。而且同步容器在多线程环境下的复合操作(迭代、条件运算如没有则添加等)是非线程安全,需要客户端代码来实现加锁。

    1.同一接口,不同实现的线程安全类

    vector的所有方法都是有synchronized关键字保护的,stack继承了vector,并且提供了栈操作(先进后出),hashtable也是由synchronized关键字保护

    package com.mmall.concurrency.example.syncContainer;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @ThreadSafe
    public class VectorExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new Vector<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    View Code

    注意:1.同步容器并不一定线程安全

    /**
     * 并发测试
     * 同步容器不一定线程安全
     * @author gaowenfeng
     */
    @Slf4j
    @NotThreadSafe
    public class VectorExample2 {
    
        /** 请求总数 */
        public static int clientTotal = 5000;
        /** 同时并发执行的线程数 */
        public static int threadTotal = 50;
    
        public static List<Integer> list = new Vector<>();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                list.add(i);
            }
            Thread thread1 = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    list.remove(i);
                }
            });
    
            Thread thread2 = new Thread(() -> {
                // thread2想获取i=9的元素的时候,thread1将i=9的元素移除了,导致数组越界
                for (int i = 0; i < 10; i++) {
                    list.get(i);
                }
            });
    
            thread1.start();
            thread2.start();
        }
    
    }
    View Code

    2. Collections.synchronizedXXX (list,set,map)

    package com.mmall.concurrency.example.syncContainer;
    
    import com.google.common.collect.Lists;
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @ThreadSafe
    public class CollectionsExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    View Code

    注意:2.在foreach或迭代器遍历的过程中不要做删除操作,应该先标记,然后最后再统一删除

    public class VectorExample3 {
    
        // java.util.ConcurrentModificationException
        // 在遍历的同时进行了删除的操作,导致抛出了并发修改的异常
        private static void test1(Vector<Integer> v1) { // foreach
            for(Integer i : v1) {
                if (i.equals(3)) {
                    v1.remove(i);
                }
            }
        }
    
        // java.util.ConcurrentModificationException
        private static void test2(Vector<Integer> v1) { // iterator
            Iterator<Integer> iterator = v1.iterator();
            while (iterator.hasNext()) {
                Integer i = iterator.next();
                if (i.equals(3)) {
                    v1.remove(i);
                }
            }
        }
    
        // success
        private static void test3(Vector<Integer> v1) { // for
            for (int i = 0; i < v1.size(); i++) {
                if (v1.get(i).equals(3)) {
                    v1.remove(i);
                }
            }
        }
    
        public static void main(String[] args) {
    
            Vector<Integer> vector = new Vector<>();
            vector.add(1);
            vector.add(2);
            vector.add(3);
            test1(vector);
        }
    }
    View Code

    同步容器性能不是特别好,并且不能保证完全线程安全。我们可以使用并发容器进行取代它。同步容器已经使用的越来越少了,都是使用并发容器替代。

    并发容器

    Java并发容器JUC是三个单词的缩写。是JDK下面的一个包名。即Java.util.concurrency。 

    什么是CopyOnWrite容器

    CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器,它的写是需要加锁的。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。 

    缺点: 

    1.写操作时复制消耗内存,如果元素比较多时候,容易导致young gc 和full gc。 

    2.不能用于实时读的场景.由于复制和add操作等需要时间,故读取时可能读到旧值。 

    能做到最终一致性,但无法满足实时性的要求,更适合读多写少的场景。 

    如果无法知道数组有多大,或者add,set操作有多少,慎用此类,在大量的复制副本的过程中很容易出错。

    设计思想: 

    1.读写分离 

    2.最终一致性 

    3.使用时另外开辟空间,防止并发冲突

    源码分析:

    //构造方法
    public CopyOnWriteArrayList(Collection<? extends E> c) {
        Object[] elements;//使用对象数组来承载数据
        if (c.getClass() == CopyOnWriteArrayList.class)
            elements = ((CopyOnWriteArrayList<?>)c).getArray();
        else {
            elements = c.toArray();
            // c.toArray might (incorrectly) not return Object[] (see 6260652)
            if (elements.getClass() != Object[].class)
                elements = Arrays.copyOf(elements, elements.length, Object[].class);
        }
        setArray(elements);
    }
    
    //添加数据方法
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;//使用重入锁,保证线程安全
        lock.lock();
        try {
            Object[] elements = getArray();//获取当前数组数据
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);//复制当前数组并且扩容+1
            newElements[len] = e;//将要添加的数据放入新数组
            setArray(newElements);//将原来的数组指向新的数组
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    //获取数据方法,与普通的get没什么差别
    private E get(Object[] a, int index) {
        return (E) a[index];
    }
    HashSet –> CopyOnWriteArraySet
    • 它是线程安全的,底层实现使用的是CopyOnWriteArrayList,因此它也适用于大小很小的set集合,只读操作远大于可变操作。因为他需要copy整个数组,所以包括add、remove、set它的开销相对于大一些。
    • 迭代器不支持可变的remove操作。使用迭代器遍历的时候速度很快,而且不会与其他线程发生冲突。

    源码分析:

    //构造方法
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();//底层使用CopyOnWriteArrayList
    }
    
    //添加元素方法,基本实现原理与CopyOnWriteArrayList相同
    private boolean addIfAbsent(E e, Object[] snapshot) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] current = getArray();
            int len = current.length;
            if (snapshot != current) {//添加了元素去重操作
                // Optimize for lost race to another addXXX operation
                int common = Math.min(snapshot.length, len);
                for (int i = 0; i < common; i++)
                    if (current[i] != snapshot[i] && eq(e, current[i]))
                        return false;
                if (indexOf(e, current, common, len) >= 0)
                        return false;
            }
            Object[] newElements = Arrays.copyOf(current, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    TreeSet –> ConcurrentSkipListSet

    它是JDK6新增的类,同TreeSet一样支持自然排序,并且可以在构造的时候自己定义比较器。

    • 同其他set集合,是基于map集合的(基于ConcurrentSkipListMap),在多线程环境下,里面的contains、add、remove操作都是线程安全的。
    • 多个线程可以安全的并发的执行插入、移除、和访问操作。但是对于批量操作addAll、removeAll、retainAll和containsAll并不能保证以原子方式执行,原因是addAll、removeAll、retainAll底层调用的还是contains、add、remove方法,只能保证每一次的执行是原子性的,代表在单一执行操纵时不会被打断,但是不能保证每一次批量操作都不会被打断。在使用批量操作时,还是需要手动加上同步操作的。
    • 不允许使用null元素的,它无法可靠的将参数及返回值与不存在的元素区分开来。

    源码分析:

    //构造方法
    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();//使用ConcurrentSkipListMap实现
    }
    HashMap –> ConcurrentHashMap
    • 不允许空值,在实际的应用中除了少数的插入操作和删除操作外,绝大多数我们使用map都是读取操作。而且读操作大多数都是成功的。基于这个前提,它针对读操作做了大量的优化。因此这个类在高并发环境下有特别好的表现。
    • ConcurrentHashMap作为Concurrent一族,其有着高效地并发操作,相比Hashtable的笨重,ConcurrentHashMap则更胜一筹了。
    • 在1.8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,但是1.8已经改变了这种思路,而是利用CAS+Synchronized来保证并发更新的安全,当然底层采用数组+链表+红黑树的存储结构。
    • HashMap与ConcurrentHashMap、HashTable

    TreeMap –> ConcurrentSkipListMap

    底层实现采用SkipList跳表

    曾经有人用ConcurrentHashMap与ConcurrentSkipListMap做性能测试,在4个线程1.6W的数据条件下,前者的数据存取速度是后者的4倍左右。但是后者有几个前者不能比拟的优点: 

    1、Key是有序的 

    2、支持更高的并发,存储时间与线程数无关

    总结

    安全共享对象策略

    ※线程限制:一个被线程限制的对象,由线程独占,并且只能被占有他的线程修改。

    ※共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改他

    ※线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它

    ※被守护对象:被守护对象只能通过获取特定的锁来访问。

    j.u.c的类图

     

     

  • 相关阅读:
    连载《一个程序猿的生命周期》-《发展篇》- 34.如果自己有想法去“创业”,怎么平衡与工作之间的关系
    连载《一个程序猿的生命周期》-《发展篇》- 33.是不是又走在“创业”的路上?!
    连载《一个程序猿的生命周期》-《发展篇》- 32.疫情中复工,跌宕起伏的2019,发展元年的2020
    连载《一个程序猿的生命周期》-《发展篇》- 28.恰逢五四,我们又走在奋斗的路上吗?
    连载《一个程序猿的生命周期》-《发展篇》- 27.从来没有996过,仍然需要人生的选择权
    连载《一个程序猿的生命周期》-《发展篇》- 26.且听风吟,静待花开,慢慢走向人生赢家
    连载《一个程序猿的生命周期》-《发展篇》- 25.论一个非正式项目经理的自我修养
    连载《一个程序猿的生命周期》-《发展篇》- 24.你所掌握的技术,创造的价值会越来越低
    连载《一个程序猿的生命周期》-《发展篇》- 23.两年多的时间,从孤家寡人到10多人的团体,经历了什么
    6年前端开发被实习生替代,所谓“经验”一文不值!
  • 原文地址:https://www.cnblogs.com/xiangkejin/p/9269545.html
Copyright © 2011-2022 走看看