zoukankan      html  css  js  c++  java
  • 【原】通过多线程分批处理派发任务

    前言:

       前几天有运营部门人员反应派发红包很慢,经常出现504或者无响应,于是由我这边进行一个优化后,发放速度由原来的超时或者1分钟变为几秒。

    发放流程:

             活动后台导入一个xls表格,大概2W左右条,经过后台的筛选处理等逻辑后会循环调用插入数据库的代码。

    优化过程:

             分析慢的原因:

                                      发放的时候循环发放,导致发放的红包一多的时候要循环几万次,而且每次插入都是new一个对象,然后往里面set数据,最后调用 jdbcTemplate插入。

              优化思路 :

        1. jdbcTemplate 有个batchUpdate的api,可以通过这个api完成批处理                 

        2. 通过多线程拆分大的单元,类似于 jdk的 forkJoin,然后每个线程处理一批,最后的结果通过回调统一归并。

       拆分List代码片段:

     抽取出一个公共的接口,用于调用具体的处理方法。

    public interface I2pTask<T, E> {
        T execute(List e, Map<String, Object> params);
    }

        由于发放红包需要实时展示给运营人员看,所以需要有回调处理函数,可以将不同结果的线程收集起来统一给主线程返回,但jdk的Callable又满足不了,所以得自己新建一个类重写Callable,这个类只负责调度处理任务以及返回执行任务结果。

    public class I2pHanderCallable<E> implements Callable<ResultBean> {
        private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);
        // 线程名称 
        private String threadName = "";
        // 需要处理的数据
        private List<E> data;
        // 辅助参数
        private Map<String, Object> params;
        // 具体执行任务
        private I2pTask<ResultBean<String>, E> task;
    
        public HandleCallable(String threadName, List<E> data, Map<String, Object> params,
                ITask<ResultBean<String>, E> task) {
            this.threadName = threadName;
            this.data = data;
            this.params = params;
            this.task = task;
        }
    
        @Override
        public ResultBean<List<ResultBean<String>>> call() throws Exception {
            // 该线程中所有数据处理返回结果
            ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();
            if (data != null && data.size() > 0) {
                logger.info("线程:{},共处理:{}个数据,开始处理......", threadName, data.size());
                // 返回结果集
                List<ResultBean<String>> resultList = new ArrayList<>();
                resultList.add(task.execute(data, params));
                /*resultList.add(task.execute(data, params));*/
                // 循环处理每个数据
              /*  for (int i = 0; i < data.size(); i++) {
                    // 需要执行的数据
                    E e = data.get(i);
                    // 将数据执行结果加入到结果集中
                    resultList.add(task.execute(e, params));
                    logger.info("线程:{},第{}个数据,处理完成", threadName, (i + 1));
                }*/
                logger.info("线程:{},共处理:{}个数据,处理完成......", threadName, data.size());
                resultBean.setData(resultList);
                resultBean.setCode(data.size());
            }
            return resultBean;
        }
    
    }
     

          具体的处理线程的类有了,那么还需要考虑来切分任务,所以新建一个 线程工具类,主要业务就是切分任务,创建具体的线程个数,统一收集结果。

    public class I2pThreadUtils<T> {
        private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);
    
        // 线程个数,如不赋值,默认为5
        private int threadCount = 20;
        // 具体业务任务
    //    private I2pTask<ResultBean<String>, T> task;
        // 线程池管理器
        private CompletionService<ResultBean> pool = null;
    
      
        public static I2pThreadUtils newInstance(int threadCount) {
            I2pThreadUtils instance = new I2pThreadUtils();
            threadCount = threadCount;
            instance.setThreadCount(threadCount);
            return instance;
        }
    
    
        @SuppressWarnings("rawtypes")
        public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {
            // 创建线程池
            int num = 0;
            ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);
            // 根据线程池初始化线程池管理器
            pool = new ExecutorCompletionService<ResultBean>(threadpool);
            // 开始时间(ms)
            long l = System.currentTimeMillis();
            // 数据量大小
            int length = data.size();
            // 每个线程处理的数据个数
            int taskCount = length / threadCount;
            // 划分每个线程调用的数据
            for (int i = 0; i < threadCount; i++) {
                // 每个线程任务数据list
                List<T> subData = null;
                if (i == (threadCount - 1)) {
                    subData = data.subList(i * taskCount, length);
                } else {
                    subData = data.subList(i * taskCount, (i + 1) * taskCount);
                }
                // 将数据分配给各个线程
                HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);
                // 将线程加入到线程池
                pool.submit(execute);
            }
    
            // 总的返回结果集
            List<ResultBean<String>> result = new ArrayList<>();
            for (int i = 0; i < threadCount; i++) {
                // 每个线程处理结果集
                ResultBean<List<ResultBean<String>>> threadResult;
                try {
                    threadResult = pool.take().get();
                    if(threadResult!=null && threadResult.getData()!=null){
                        System.out.println("======线程" + i + "执行完毕,返回结果数据:" + threadResult.getCode());
                        List<ResultBean<String>>  list =  threadResult.getData();
                        num+=threadResult.getCode();
                    }
                    System.out.println("每个线程处理结果集"+threadResult.getData());
                    result.addAll(threadResult.getData());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
    
            }
            // 关闭线程池
            threadpool.shutdownNow();
            // 执行结束时间
            long end_l = System.currentTimeMillis();
            logger.info("总耗时:{}ms", (end_l - l));
            logger.info("总数量:{}num:", num);
            return ResultBean.newInstance().setData(num);
        }
    
        public int getThreadCount() {
            return threadCount;
        }
    
        public void setThreadCount(int threadCount) {
            this.threadCount = threadCount;
        }
    
    }
    

     以上

  • 相关阅读:
    【转】 Linux进程间通信
    Django中的Templates
    Django中的应用
    url的使用
    Django框架的使用
    Django的安装
    文件上传
    flask中的request和response
    模板
    静态文件处理
  • 原文地址:https://www.cnblogs.com/zdd-java/p/10306194.html
Copyright © 2011-2022 走看看