zoukankan      html  css  js  c++  java
  • 用多线程优化Excel表格数据导入校验的接口

    公司的需求,当前某个Excel导入功能,流程是:读取Excel数据,传入后台校验每一条数据,判断是否符合导入要求,返回给前端,导入预览展示。(前端等待响应,难点)。用户再点击导入按钮,进行异步导入(前端不等待,好做)。当前接口仅支持300条数据,现在要求我要支持3000条数据。

    解决问题,思路是关键。

      首先,查看接口,找到读取表格的位置,看到判断,如果数据量大于300,直接返回。把300改成3000.

      然后,分析导入数据校验,都是和哪些数据进行校验的,这些数据都是从数据库来的。每一次都从数据库查询,那肯定是慢的。就算是查询Redis缓存,也要有网络消耗,增加缓存的压力。虽然单机Redis有12万次/秒的查询性能,12万除以3000得40,如果这样玩,40个人使用就拖垮系统了。同一个数据,非要查3000次,那是不是傻???所以减少每一次的查询,把数据库查询都加上Redis缓存,把Redis缓存查到的数据,在方法中创建并发安全容器ConcurrentHashMap存储数据,避免重复的查询操作,只查一次直到方法调用结束。

      Map<String, Object> map = new ConcurrentHashMap();
            Object obj = map.get("key");
            if (null == obj){
                //查询缓存,或者数据库
                String value = "数据";
                map.put("key", value);
            }

    方法内部创建的对象,当方法调用完成,进栈出栈,释放引用,就会释放内存。在3000次校验的过程中,Object对象,是在jvm内存中的,方便被快速的重复使用,而不是需要再次从数据库或者缓存中获取。这是方法栈级别的缓存,JVM缓存,本地缓存。

    这就是最重要的思想,思维。做到一个方法中,尽量少的查询,把查询的结果重复利用。

    当我做完了在方法中用ConcurrentHashMap缓存数据,就进行了测试。

    结果:最多支持800条导入数据的校验。前端请求超过10秒,就会请求超时。

    怎么办呢???

    产品,你这个需求搞不定啊。无法实现啊。。。。。。扯皮中。。。。。扯皮无效。

    接着用多线程技术进行优化。

    1.创建线程池

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 线程池<br/>
     * 
     * 
     * @author 
     * @version 
     */
    public class MyExecutor {
    
       /**
        * 在池中保持的线程的最小数量
        */
       private static final int CorePoolSize = 10;
       /**
        * 线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理
        */
       private static final int MaximumPoolSize = 200;
       /**
        * 线程的最大生命周期。这里的生命周期有两个约束条件:
        * 一:该参数针对的是超过corePoolSize数量的线程;
        * 二:处于非运行状态的线程。举个例子:如果corePoolSize(最小线程数)为10,maxinumPoolSize(最大线程数)为20,
        * 而此时线程池中有15个线程在运行,过了一段时间后,其中有3个线程处于等待状态的时间超过keepAliveTime指定的时间,
        * 则结束这3个线程,此时线程池中则还有12个线程正在运行。
        */
       private static final int KeepAliveTime = 30;
       /**
        * 等待任务队列大小
        */
       private static final int Capacity = 10000;
       
       private static final ExecutorService pool = new ThreadPoolExecutor(CorePoolSize, MaximumPoolSize, KeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingDeque<>(Capacity));
       
    
    
       public static ExecutorService getPool(){
          return pool;
       }
       
    }

    2.创建用于接收线程池任务返回值有序集合,方便依次获取结果。 

    List<Future<Object>> futureList = new LinkedList<>();

    //说明:ArrayList和LinkedList的大致区别:
    // 1.ArrayList是实现了基于动态数组的数据结构,LinkedList是基于链表结构。
    // 2.对于随机访问的get和set方法,ArrayList要优于LinkedList,因为LinkedList要移动指针。
    // 3.对于新增和删除操作add和remove,LinkedList比较占优势,因为ArrayList要移动数据。
    // 因为要add()3000次数据,所以选择LinkedList

    3.获取线程池

       //获取线程池
            ExecutorService pool = MyExecutor.getPool(); 

     4.读取Excel表格数据,遍历每一行,每一行数据都提交一个任务到多线程。

            //提交Callable任务到线程池
            Future<Object> future  = pool.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    // 每条数据的计算
                    return null;
                }
            });
    
             //把单个结果加入有序集合中。
            futureList.add(future);

     5.遍历futureList获取结果。

     for (Future<Object> oneFuture : futureList) {
                try {
                    //每一个任务的结果,阻塞方法,一直等待到计算任务完成。
                    Object result = oneFuture.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

     6.如此,把所有结果组合起来,返回。就完成了这个方法的线程池运用的改造。

    7.这时候,又出现一个问题,3000条数据,每条数据都有一个id,如何在多线程里,让处理过的id不重复,出现重复还能做标记呢???

      CopyOnWriteArraySet 和 ConcurrentSkipListSet 介绍:

      CopyOnWriteArraySet 它是线程安全的无序的集合,可以将它理解成线程安全的HashSet。对其所有操作使用内部 CopyOnWriteArrayList 的 Set。因此,它共享以下相同的基本属性:
    ⑴它最适合于具有以下特征的应用程序:set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
    ⑵它是线程安全的。
    ⑶因为通常需要复制整个基础数组,所以可变操作(add、set 和 remove 等等)的开销很大。 迭代器不支持可变 remove操作。
    ⑷使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

      

      ConcurrentSkipListSet是线程安全的有序的集合,适用于高并发的场景。他是一个基于 ConcurrentSkipListMap 的可缩放并发 NavigableSet 实现。

    ⑴和TreeSet一样,支持自然排序,可以在构造的时候定义比较器;
    ⑵其中的contains, add, remove操作都是线程安全的。

    ⑶但对于批量操作,比如addAll removeAll, containsAll并不能保证原子性执行,因为其底层还是调用contains, add, remove方法,在批量操作时,只能保证每一个的add等操作是原子性的,对于批量操作在调用时,还是要手动加上锁保证原子性;
    ⑷不允许存储空元素;

    所以这里我用到了适用于高并发的Set ===>  ConcurrentSkipListSet 。

         // 并发安全,去重复
            ConcurrentSkipListSet<Integer> idSet = new ConcurrentSkipListSet<>();
    
          boolean flag =idSet.add(id);
            if (!flag){
                //添加失败,说明数据重复。
            }

     我们来看看ConcurrentSkipListSet的add()方法的源码:

     /**
         * Adds the specified element to this set if it is not already present.
         * More formally, adds the specified element {@code e} to this set if
         * the set contains no element {@code e2} such that {@code e.equals(e2)}.
         * If this set already contains the element, the call leaves the set
         * unchanged and returns {@code false}.
         *
         * @param e element to be added to this set
         * @return {@code true} if this set did not already contain the
         *         specified element
         * @throws ClassCastException if {@code e} cannot be compared
         *         with the elements currently in this set
         * @throws NullPointerException if the specified element is null
         */
        public boolean add(E e) {
            return m.putIfAbsent(e, Boolean.TRUE) == null;
        }

     把上面的描述内容用谷歌翻译:

          如果指定的元素尚不存在,则将其添加到此集合中。
          更正式地说,将指定的元素{@code e}添加到此集合if
         该集合不包含{@code e2}元素,以便{@code e.equals(e2)}。
          如果此集合已包含该元素,则该调用将离开该集合
         不变并返回{@code false}

    说明我们这里的id去重的用法完全正确。

    我们再来看看Future的get()方法的源码:

     /**
         * Waits if necessary for the computation to complete, and then
         * retrieves its result.
         *
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         */
        V get() throws InterruptedException, ExecutionException;

     翻译:Waits if necessary for the computation to complete, and then retrieves its result .

                         等待计算完成所需,然后取回其结果

    所以,Future的get()方法是阻塞等待的。

    到此,我就完成了从开始的300条数据,到800条数据10秒响应,优化到了3000条数据7秒响应。

    即完成了任务,又提高了性能。

    通过这一次运用了,线程池,Future,Callable 和并发安全容器类ConcurrentHashMap、ConcurrentSkipListSet 等技术,

    很大的提高了我的多线程,并发编程的技术。还有方法栈级别的数据缓存,JVM缓存,这是一个思想的飞跃。

  • 相关阅读:
    设计模式(三)--观察者模式
    设计模式(二)--单例模式
    tornado 资源
    复习 网络通信协议
    设置允许远程连接MySQL (Ubuntu为例)
    ubuntu 下安装ssh服务
    Python 运算内建函数
    py知识点拾遗之sort(),sorted(),reverse(),reversed()
    SQLite安装 以及 SQLite header and source version mismatch错误解决 (In debian)
    debian折腾笔记
  • 原文地址:https://www.cnblogs.com/itbac/p/11291706.html
Copyright © 2011-2022 走看看