zoukankan      html  css  js  c++  java
  • 多线程数据查询统计

    最近在做一个需求,就是根据选定的对账单查询该对账单下的所有交易明细,但是由于交易明细表过大——几百万——且没有做分表等操作,数据库用的是mysql,实时去查询的话,会比较慢。

    虽然在交易明细表中给对账单id加了索引,但是由于查询的字段比较多且无需,肯定需要回表操作,另外还有对数据进行排序,结果造成每次导出一个对账单明细需要花费一两分钟的时间。

    鉴于此,考虑使用多线程来解决该问题,因为交易明细表中存在交易时间字段,因此考虑通过交易时间进行拆分,并根据拆分出的时间段参数用多个线程进行查询,最后统计。

    实施方案:

    1、在交易时间上创建索引,或者跟对账单id一起创建联合索引

    2、根据对账单下交易的最大和最小时间,并根据当前对账单的交易笔数进行合理拆分

     交易(时间)拆分主要集中在以下几个方面

     1> 交易的笔数

    正常对账单以月为单位,即时间范围为一个月,以对账单交易笔数10W为例,如果每天的交易偏移量不大,即每天的交易笔数都在3000左右,这样就可以以天为单位进行拆分

    非正常情况下,月对账单可能包含不属于当前月份的交易,且当前月份交易站90%以上,即交易偏移量很大,这样就可能需要细分:

      根据每天的交易量进行分组排序,按照尽可能平均的交易量进行日期拆分

      直接按照月份进行拆分,因为99%的情况下是一个月的交易稳定,而其他月份加到一起也不到一个查询时间周期的量,这时把所有不属于当前月份的拆成一次查询,当月再进行按天拆分

     2> 数据库服务器配置

    主要是考虑数据库服务器性能,正常来说,肯定是线程越多查询效率越高,即木桶原理,总的时间取决于查询时间最长的一个线程,但对于数据库来讲,线程越多对数据库的配置要求就越高,如果数据库硬件配置不足,而同时执行的线程又太多,会导致线程阻塞(多线程效率降低),或者直接造成数据库死锁。

     3>应用服务器性能

    因为数据统计出来之后,可能会存在对数据进行二次遍历重组的情况,如过滤一些不需要的数据,对一些特殊数据进行单独处理,另外本身一次性查询10W数据,即需要服务器内存处理10W的数据量,对服务器本身的压力也挺大,如果由于数据量过大造成应用服务器内存溢出,那就只能分段处理或者增加服务器内存了

    技术选择

    一般的多线程场景,前台发起请求(如下单)需要及时作出响应,但后台可能还需要各种其他操作,而这些操作可能会占用大量的时间,用户不可能一直等待这些后续处理都完成才收到反馈,这种情况一般都会在主线程先给用户(前台)返回一个成功操作的标识,然后后台再启一个或多个线程(Thread)去处理后续业务,用户提交成功后可以刷新查询当前提交业务的处理进度;或者是通过消息队列的方式,主线程返回前台成功标识并把当前订单信息放入消息队列,相应的消费线程读取队列中的订单并进行后续的操作。

    上述场景为用户不需要等待最终处理,且后续处理非常耗时间。但像统计这种需要快速的返回想要的结果,因此用到了另一种多线程——Callable

    Callable 

    Callable和Future出现的原因

    创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。 
    这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 
    如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

    而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

    Callable和Future介绍

    Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

    Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。 
    java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

    代码实现,我这里最大限制一个月只能查3次,就是因为dba不允许有超过4个线程同时进行查询,说数据库性能不行,压力会很大

    先查询对账单交易量,如果超过某个设定值(我这里设定1W)就开始拆分,后面如果有多个月份的,遍历去累加总的交易笔数,当超过设定值时,拆分出一个查询,重新做累加。

    public List<StmtPolicyTransData> listPolicyTransDataByStatement(String statementId) {
            // 根据对账单拆分日期
            int record = 10000;
            // 1、 查询总记录数
            int totalCount = stmtService.countTransactionByStatement(statementId);
            logger.debug("对账单{}总交易记录数:{}", statementId, totalCount);
            if (totalCount < record) {
                // 不足1w,直接查询
                return stmtService.listPolicyTransDataByStatement(statementId, null, null);
            }
            // 2、查询每月记录数
            List<StmtMonthTransCount> monthCountList = stmtService.countStmtTransactionByMonth(statementId);
            // 3、统计查询参数  判断交易月份
            List<StmtTransDataParams> paramsList = Lists.newArrayList();
            if (monthCountList.size() == 1) {
                paramsList.addAll(splitTransactionTime(monthCountList.get(0)));
            } else {
                Date startTime = monthCountList.get(0).getMinTime();
                // 累加计数
                int sumCount = 0;
                for (StmtMonthTransCount data : monthCountList) {
                    // 先判断之前是否有未计算的月份
                    if (sumCount != 0) {
                        sumCount += data.getCount();
                        if (sumCount > record) {
                            StmtTransDataParams params = new StmtTransDataParams();
                            params.setStartTime(startTime);
                            params.setEndTime(data.getMinTime());
                            paramsList.add(params);
                            // 重置查询时间
                            startTime = data.getMinTime();
                            sumCount = data.getCount();
                        }
                        if (data.getCount() > record) {
                            paramsList.addAll(splitTransactionTime(data));
                            startTime = null;
                            sumCount = 0;
                        } else if (monthCountList.indexOf(data) == monthCountList.size() - 1) {
                            // 最后一个月份数据
                            StmtTransDataParams params = new StmtTransDataParams();
                            params.setStartTime(startTime);
                            params.setEndTime(DateUtils.addMinutes(data.getMaxTime(), 1));
                            paramsList.add(params);
                        }
                    } else {
                        if (data.getCount() > record) {
                            paramsList.addAll(splitTransactionTime(data));
                        } else {
                            // 新计算月份,重置开始时间
                            startTime = data.getMinTime();
                            sumCount = data.getCount();
                            if (monthCountList.indexOf(data) == monthCountList.size() - 1) {
                                // 最后一个月份数据
                                StmtTransDataParams params = new StmtTransDataParams();
                                params.setStartTime(data.getMinTime());
                                params.setEndTime(DateUtils.addMinutes(data.getMaxTime(), 1));
                                paramsList.add(params);
                            }
                        }
                    }
                }
            }
    
            // 遍历参数  创建线程查询
    //        ExecutorService executorService = Executors.newFixedThreadPool(paramsList.size());
    
            ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(paramsList.size(),
                    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
            List<Callable<List<StmtPolicyTransData>>> callableList = Lists.newArrayList();
            paramsList.forEach(params -> callableList.add(new StmtTransDataCall(stmtService, statementId, params.getStartTime(), params.getEndTime())));
    
            // 遍历统计交易数
            List<StmtPolicyTransData> dataList = Lists.newArrayList();
    
            try {
                List<Future<List<StmtPolicyTransData>>> futureList = executorService.invokeAll(callableList);
                if (!CollectionUtils.isEmpty(futureList)) {
                    for (Future<List<StmtPolicyTransData>> future : futureList) {
                        dataList.addAll(future.get());
                    }
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("多线程查询数据异常:" + e.getMessage());
            }
            // 关闭线程池
            executorService.shutdown();
            return dataList;
        }
    
        /**
         * 单月按照交易时间拆分
         *
         * @param stmtMonthTransCount
         * @return
         */
        private List<StmtTransDataParams> splitTransactionTime(StmtMonthTransCount stmtMonthTransCount) {
            List<StmtTransDataParams> paramsList = Lists.newArrayList();
            int record10 = 100000;
            int record6 = 60000;
            int record4 = 30000;
            int record2 = 15000;
    
            // 根据交易数拆分
            int count = stmtMonthTransCount.getCount();
            Date minTime = stmtMonthTransCount.getMinTime();
            Date maxTime = stmtMonthTransCount.getMaxTime();
            int days = 0;
            // 考虑数据库服务器性能问题,暂定最多开4个线程,另暂不考虑有其他月份情况
            if (count >= record10) {
                // 10w以上,3天查一次,最少查10次
                days = 11;
            } else if (count >= record6) {
                // 6w以上,5天查一次,最少查6次
                days = 11;
            } else if (count >= record4) {
                // 4w以上,查3次  10天查一次
                days = 11;
            } else if (count >= record2) {
                // 2w以上,16天查一次,最多查2次
                days = 16;
            } else {
                days = 31;
            }
            while (minTime.compareTo(maxTime) < 0) {
                Date endTime = DateUtils.addDays(minTime, days);
                if (endTime.compareTo(maxTime) > 0) {
                    endTime = maxTime;
                }
                StmtTransDataParams params = new StmtTransDataParams();
                params.setStartTime(minTime);
                params.setEndTime(endTime);
                paramsList.add(params);
                minTime = endTime;
            }
            return paramsList;
        }
  • 相关阅读:
    In Java, how do I read/convert an InputStream to a String? Stack Overflow
    IFrame自动更改大小
    [置顶] 获取服务器上格式为JSON和XML两种格式的信息的小程序
    Qt VS MFC
    [技术分享]使用 UAG 发布 RemoteAPP
    linux2.6.32在mini2440开发板上移植(11)之配置USB外设
    MFC控件(2):Text_Edit_Control
    CentOS 6.4 安装 Oracle 10g2 备记
    sql lite 二个数据库之间的表进行复制
    变量和函数的定义和声明
  • 原文地址:https://www.cnblogs.com/flysand/p/10984894.html
Copyright © 2011-2022 走看看