zoukankan      html  css  js  c++  java
  • 并发包学习(二)-容器学习记录

    可能有些同学知道ArrayList,HashSet,,HashMap这些容器都是线程不安全的,如果多个线程并发的访问这些容器就会导致线程不安全问题,很多时候需要我们手动对这些容器进行同步处理,造成我们很大的不便,因此java为我们提供了同步容器和并发容器来解决这个问题。

    一、同步容器

    首先详细介绍前,需要强调下同步容器是线程安全的类,但是也可能造成线程不安全的问题,原因在后面有解释。

    同步容器的原理很简单,就是在原容器的基础上加了synchronize的锁,来保证同一时间只有一个线程来访问。

    同步容器总的可以分为两类:

    • java提供好的线程的类
    1. ArrayList >>Vector,Stack
    2. HashMap>>HashTable
    • Collections.synchronizedXXX提供的静态工厂方法创建的类
    1. Collections.synchronizedCollection(Collection<T>t)
    2. Collections.synchronizedList(List<T>list)
    3. Collections.synchronizedMap(Map<K, V>map)
    4. Collections.synchronizedSet(Set<T> t)

    Vector案例一(线程安全)

    @Slf4j
    @ThreadSafe
    public class VectorExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new Vector<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }

    Vector案例二(线程不安全)

    @NotThreadSafe
    public class VectorExample2 {
    
        private static Vector<Integer> vector = new Vector<>();
    
        public static void main(String[] args) {
    
            while (true) {
    
                for (int i = 0; i < 10; i++) {
                    vector.add(i);
                }
            
                Thread thread1 = new Thread() {
                    public void run() {
                        for (int i = 0; i < vector.size(); i++) {
                            vector.remove(i);
                        }
                    }
                };
    
                Thread thread2 = new Thread() {
                    public void run() {
                        for (int i = 0; i < vector.size(); i++) {
                            vector.get(i);
                        }
                    }
                };
                thread1.start();
                thread2.start();
            }
        }
    }

    我在上面的代码标题上已经提前说明这是个线程不安全的类了,为什么同步容器的Vector也可能是线程不安全的呢。大家可以实际运行下上面的类,应该会报数组越界的错误。

    这里我解释下,Vector虽然能保证同一个时刻只有一个线程在访问它,以上面的代码为例,当我们的线程2运行到get(i)的时候,线程1刚好把这个数据移除,这个时候就会出现问题。所以同步容器因为操作顺序的原因,可能会产生线程不安全的问题。

    Vector案例二

    public class VectorExample3 {
    
        // java.util.ConcurrentModificationException
        private static void test1(Vector<Integer> v1) { // foreach
            for(Integer i : v1) {
                if (i.equals(3)) {
                    v1.remove(i);
                }
            }
        }
    
        // java.util.ConcurrentModificationException
        private static void test2(Vector<Integer> v1) { // iterator
            Iterator<Integer> iterator = v1.iterator();
            while (iterator.hasNext()) {
                Integer i = iterator.next();
                if (i.equals(3)) {
                    v1.remove(i);
                }
            }
        }
    
        // success
        private static void test3(Vector<Integer> v1) { // for
            for (int i = 0; i < v1.size(); i++) {
                if (v1.get(i).equals(3)) {
                    v1.remove(i);
                }
            }
        }
    
        public static void main(String[] args) {
    
            Vector<Integer> vector = new Vector<>();
            vector.add(1);
            vector.add(2);
            vector.add(3);
            test1(vector);
        }
    }

    结果:前两种test方法均会抛出溢常,第三种正常,大家在用foreach和iterator的时候不要对容器的数据进行移除操作,因为这两种方法会对容器的大小和预期的值进行校验。同理ArrayList等也会产生这样的问题的。这个东西对于我实在是印象深刻,因为不知道这个问题,闹出来很多毛病。

    Collections案例一

    @Slf4j
    @ThreadSafe
    public class CollectionsExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }

    Collections案例二

    @Slf4j
    @ThreadSafe
    public class CollectionsExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }

    Collections案例三

    @Slf4j
    @ThreadSafe
    public class CollectionsExample3 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }

    由上面三个案例可以看出Collections.synchronizedXXX生成的三个同步容器类得到的值和预期的结果是相同的,所以是安全的。

    总结:同步容器保证了同一时刻只有一个线程在访问,但是因为操作的原因,还是会产生线程不安全的问题,这个时候我们可以使用synchronize或者Lock来对相关代码块进行加锁操作,但是这种情况下又导致性能比较低下,又有什么好的解决办法呢。答案就在下面要介绍的并发容器了,实际项目中,同步容器已经很少使用,更多的还是被并发容器所取代了。

     二、并发容器

    ArrayList >>CopyOnWriteArrayList

    CopyOnWriteArrayList 有几个缺点:
    1、由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc
    2、不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求;
    CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用
    因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障。

    CopyOnWriteArrayList透露的思想
    如上面的分析CopyOnWriteArrayList表达的一些思想:
    1、读写分离,读和写分开
    2、最终一致性
    3、使用另外开辟空间的思路,来解决并发冲突

    @Slf4j
    @ThreadSafe
    public class CopyOnWriteArrayListExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new CopyOnWriteArrayList<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }

    通过结果可知是线程安全的。

    HashSet、TreeSet>>CopyOnWriteArraySet、ConcurrentSkipListSet

    CopyOnWriteArraySet的底层的实现是CopyOnWriteArrayList,因此它的特点和CopyOnWriteArrayList类似

    • 它最适合于具有以下特征的应用程序:set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
    • 它是线程安全的, 底层的实现是CopyOnWriteArrayList;
    • 因为通常需要复制整个基础数组,所以可变操作(add、set 和 remove 等等)的开销很大。
    • 迭代器不支持可变 remove 操作。
    • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
    @Slf4j
    @ThreadSafe
    public class CopyOnWriteArraySetExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new CopyOnWriteArraySet<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }

    ConcurrentSkipListSet是JDK6新增的类,ConcurrentSkipListSet基于map集合,需要注意在此类的批量操作的方法不保证原子性,但是保证底层每次调用的原子性。所以在批量操作时需要另外完成同步操作。

    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListSetExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new ConcurrentSkipListSet<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }

    HashMap、TreeMap>>ConcurrentHashMap、ConcurrentSkipListMap

    ConcurrentHashMap

    @Slf4j
    @ThreadSafe
    public class ConcurrentHashMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }

    ConcurrentSkipListMap

    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
  • 相关阅读:
    审核系统
    ehcache 缓存
    tomcat 内存设置
    html5 开发 跨平台 桌面应用
    service thread 结合使用
    html5桌面应用
    鼠标 事件
    服务器 判断 客户端 文件下载
    使用github管理Eclipse分布式项目开发
    uub代码
  • 原文地址:https://www.cnblogs.com/laoyeye/p/9452521.html
Copyright © 2011-2022 走看看