zoukankan      html  css  js  c++  java
  • 数据汇总计算和分析的反思

    以下内容,都经过本人实践验证过。

    若转发,请在标题上标记[转],并注明原文链接:http://www.cnblogs.com/robinjava77/p/6285747.html,作者名称:robin。

    并在文章首行附上本段话。否则,作者保留追究的权利。

     

    术语定义:

    1.片:本周、本月、本年、近两月、近三月、近半年、近一年和至今八个维度

     

     

    诉求:基于**年的日数据,进行计算汇总,分别以本周、本月、本年、近两月、近三月、近半年、近一年和至今八个维度进行统计精准性修复,时间消耗越低越好。

     

    场景:

    1.日数据是每日都会进行各类综合计算,形成的业务基础数据。

    2.每日只会保留当日的片数据。

    3.当日片数据是根据当日日数据、昨日片数据和需要去掉的日数据,综合计算而保存下来。

    4.日数据,在未发现错误前,都是可靠的。

    5.日数据,少量错误,修复少量key的片数据。

    6.某日的日数据,出现大量甚至是全量key的数据错误,则需进行全量精准性修复。

     

    应对第六个场景,就出现了我们上文提到的诉求。

     

     

    两年的基础日数据大约在***亿多条,由于系统现在仍然处于第一代架构:单进程java+oracle的原始架构。无法使用后续升级的架构:①zk+多进程java去中心化分布式计算;②mongo+spark;③hadoop+spark等等来更优地去解决现在的诉求。

     

    因此本文仅针对:单进程java+oracle解决上述诉求来进行说明。如果有同类诉求的场景,最好是在项目架构时,根据实际情况,直接选择更好的架构去进行开发。这样能避免一些让人十分尴尬的场景。本人所负责的项目,启动之初,由于没有业务大神参与,领导只能选择简单易掌控的架构,好让所有开发人员,都能将主要精力投入业务规则的摸索和熟悉。

     

    废话说太多,直接上主菜。

     

    针对这个诉求,自己前前后后花了一定的时间才解决。现在按实践的时间前后顺序,分别是:①集团作战方案;②一锅乱炖方案;③分而治之方案。

     

    所有日数据,都是基于key进行唯一性标识,片的日数据量=key数量*8。

    Key的数量仅是***万级。

     

    以下是一些基础类定义和变量说明

     1 // List<String> keys集团线程的1万个key
     2 //List<Unit> units 片定义数据集合
     3 Class Unit{
     4     String name;//片标识
     5     Integer startDay;//片起始日期
     6     Integer endDay;//片截止日期
     7     Integer indexDay;//片的索引日期
     8 }
     9 Class DayData{
    10     Integer day;//日期
    11     Number data;//日数据
    12     String key;//
    13 }
    基础类定义和变量说明

     ①集团作战方案:遍历key,将1万个key划为1个集团单位,建立一个集团线程,将这个集团线程加入业务线程池。每个集团线程,仅负责计算本线程所分配的key的片数据。

     

    下面贴出集团线程的核心伪代码:

     1 //读取该集团起止key范围所有天的日数据
     2 Map<String,List<DayData>> groupDataWarehouse = getDayDataByDBOnSE(startKey,endKey);
     3 //key 片 计算汇总临时存储对象  key unit dayData
     4 HashBasedTable<String,String,Number> result= HashBasedTable.create();
     5 
     6 for(String key:keys){//遍历集团所属的key
     7     List<DayData> dayDatas = groupDataWarehouse.get(key);//获取该key的所有天的日数据
     8     for(DayData daydata: dayDatas){//遍历该key的日数据
     9         for(Unit unit:units){//遍历需要计算的片
    10             if(daydata.getDay().before(unit.getEndDay()) 
    11                 && daydata.getDay().after(unit.getStartDay())){//该key该日的日数据是否在片的计算范围
    12                 Number temp = result.get(key,unit.getName());//获取临时存储对象之前存储的结果
    13                 if(temp == null){
    14                     temp = new Number();
    15                 }
    16                 temp = ArithUtil.collect(temp,dayData.getData());//前述结果与该日日数据进行汇总计算
    17                 result.put(key,unit.getName(),temp);//计算后的数据,保存进临时存储对象
    18             }
    19         }
    20     }
    21 }
    22 
    23 saveDataIntoDB(result);//保存数据进数据库
    集团线程伪代码

    集团线程的好处:

    1.将大量的key分散成少量的集团key;

    2.少量的key,读取数据库数据量少,内存占用量少;

    3.一个集团的数据计算出错,不影响其他的集团key,只需重新执行出错的集团key即可;

    4.控制业务线程池同时可执行线程数目,就能降低服务器负载或者提高计算的性能。

     

    但是这种方案实现后,在实测中,DB强烈要求禁止使用

    因为在日常计算场景里,日数据在数据库存储的分区是按天来进行分区。

    但集团线程则是根据key来进行分区,每个集团线程查询该片key的日数据时,需要在数据库查询600多个数据分区。

    因此数据库吃不消,数据库的CPU、内存和其他各种指标,都被打得非常高,严重影响其他系统的数据服务。

     

    基于上述原因,DB建议我们按天来获取数据,将每天的数据计算完成后,直接丢弃。基于DB的建议,就出现了第二种方案:一锅乱炖方案。

     

    ②一锅乱炖方案:又分为两个阶段:(1)全部数据取出来计算;(2)取单日数据,计算完再获取下一个单日数据。

     

    (1)遍历日期,读取数据,放入Map<String,List<DayData>> dataWarehouse中,然后替代方案①中的getDayDataByDB(startKey,endKey)方法,按照集团作战方案,进行数据计算。

    这里的伪代码就不写了,源代码更不想贴。总而言之,这是一个非常糟糕的方案。

    第一,所有基础数据,都加载到内存,消耗几十G的内存,放这些临时数据,服务器吃不消;

    第二,只能单线程同步执行,无法异步执行,即使异步执行,也需对dataWarehouse用关键字“synchronized”上锁,导致效率低下。如果给每个key一个单独的队列来进行处理,整个代码实现冗长又难看;

    第三,可能公司不缺服务器,DB说几十G的内存搞不定,就申请几百G的服务器呗,作为一个有追求的程序员,真是欲哭无泪。

     

    (2)按日期顺序,创建单日读数据线程,放入读数据线程池。单日读数据线程读取指定日所有key的日数据,写入LinkedBlockingQueue队列中。另外创建一个单独的计算线程,从队列中获取单日的数据,遍历日数据,按key保存中间结果。

           这个方案,将其转化为生产者-消费者模型,注意:读数据线程和计算线程不能放在同一个工作线程池内,否则容易造成死锁。

           LinkedBlockingQueue(10) queue;这个队列是支持阻塞方式。

           List<DayData> data

           生产者:put(data);

           消费者:data = take();

           放在同一个线程池内,线程池内所有的工作线程被生产者占据,队列被塞满后,所有工作线程都阻塞在put方法,消费者无法获取工作资源。

           这是最容易犯的一个错误之一。

           但这种方案有明显的弊端:由于受按key汇总计算值的限制,消费者只能一个,即便生产者是多线程,队列设置得足够大,因为消费者的效率低下,导致所需时间无法预估,经实验,是达不到预期最低要求的。

           简单贴下伪代码:

      

     1 BlockingQueue<Vector<DayData>> queue = new LinkedBlockingQueue(100);//阻塞队列
     2 
     3 生产者(多线程)
     4 Vector<DayData> dayData = getDayDataByDBOnDay(day);
     5 queue.put(dayData);
     6 
     7 消费者(单线程)
     8 Thread cal = new Thread(new Runnable() {
     9     @Override
    10     public void run() {
    11         HashBasedTable<String,String,Number> result= HashBasedTable.create();
    12         int calDayCount = 0;
    13         while(calDayCount < totalDayCount){
    14             Vector<DayData> dayDatas = queue.take();
    15             for (DayData dayData:dayDatas){
    16                 for (Unit unit:units){
    17                     if(daydata.getDay().before(unit.getEndDay()) 
    18                     && daydata.getDay().after(unit.getStartDay())){//该key该日的日数据是否在片的计算范围
    19                         Number temp = result.get(dayData.getKey(),unit.getName());
    20                         if(temp == null){
    21                             temp = new Number();
    22                         }
    23                         temp = ArithUtil.collect(temp,dayData.getData());//前述结果与该日日数据进行汇总计算
    24                         result.put(dayData.getKey(),unit.getName(),temp);//计算后的数据,保存进临时存储对象
    25                     }
    26                 }
    27             }
    28             calDayCount++;
    29         }
    30         saveDataIntoDB(result);//保存数据进数据库
    31     }
    32     
    33 });
    34 cal.start();
    View Code

    生产者线程和消费者线程,都完成相应的任务后,用CountDownLatch downLatch = new CountDownLatch(int num);倒数计数器锁来进行异步等待控制

    ③分而治之方案:根据这个特定的业务场景,(1)将日数据汇总为月月数据,保存下来;(2)根据各个片,计算片的开始日期月1到startDay前一天的阶段去掉数据;(3)取片起止时间包含的月数据文件和阶段去掉数据,月数据进一步汇总,最后消去阶段去掉数据。

     

    分而治之方案优点:

    (1)大功能拆分为三个小功能,各个小功能独立实现,每个小功能可使用多线程快速完成,整体程序开发可控,代码不会太冗长;

    (2)各个功能互不影响,可以异步进行,通过CountDownLatch在主线程中,达到异步等待;

    (3)基础日数据按月汇总保留文件,下次再次进行修复,无需再次读取,若历史日数据发现有错,则删除错误日的月汇总文件,仅重新生成该日的月汇总文件即可。

     

    下面贴伪代码:

    1 getModulo(String key){//key 按Constant.MODULO返回取模的余数
    2     //具体实现略
    3 }
    4 
    5 void precise(Integer today,List<Unit> units,ExecutorService BOSS_EXEC){
    6     createMonthFile(today,BOSS_EXEC);//创建以月为汇总的中间值文件
    7     createUnitRemoveFile(units,BOSS_EXEC);//创建本周、近两月、近三月、近半年、近一年需被去掉的中间值文件
    8     calPreciseAll(today,units,BOSS_EXEC);//基于月汇总和除掉的收益率中间值文件进行精确的片收益率计算
    9 }
    分模方法和入口方法定义
     1 void createMonthFile(Integer today,ExecutorService BOSS_EXEC){
     2     Integer localMonth = getMonth(today);
     3     String rootFilePath = getPath();
     4     String monthFilePath = FileUtil.getCompleteFilePath(rootFilePath,localMonth+"");//获取今月汇总文件夹
     5     FileUtil.deleteFile(monthFilePath);//删除今月汇总文件
     6     Map<Integer,List<Integer>> months = getDayByMonth(today);//按月分散每个月包含的日期
     7     CountDownLatch monthFileDownLatch = new CountDownLatch(months.size());
     8     for(Map.Entry<Integer,List<Integer>> entry:months.entrySet()){
     9         Runnable run = createMonthFileRun(entry.getValue(),monthFileDownLatch,entry.getKey(),rootFilePath);
    10         BOSS_EXEC.submit(run);
    11     }
    12     monthFileDownLatch.await();//月汇总数据功能完成后继续向下执行,否则阻塞
    13 }
    创建月汇总文件
     1 Runnable createMonthFileRun(final List<Integer> todays,final CountDownLatch downLatch,final Integer month,final String rootFilePath){
     2         Runnable run = new Runnable() {
     3             @Override
     4             public void run() {
     5                 try{
     6                     String monthFilePath = FileUtil.getCompleteFilePath(rootFilePath,month+"");
     7                     if(!FileUtil.isExist(rootFilePath) || !FileUtil.isExist(monthFilePath)){//根目录不存在或者月汇总文件不存在 则进行汇总计算
     8                         Map<String,Number> monthData = new HashMap<>();
     9                         //以key 为标记,汇总各个key的月中间值数据
    10                         for (Integer day:todays){
    11                             List<DayData> dayDatas = getDayDataByDBOnDay(day);
    12                             for (DayData dayData:dayDatas){
    13                                 String key = dayData.getKey();
    14                                 Number temp = monthData.get(key);
    15                                 if(temp == null){
    16                                     temp = new Number();
    17                                 }
    18                                 temp = ArithUtil.collect(temp,dayData.getData());
    19                                 monthData.put(key,temp);
    20                             }
    21                         }
    22                         //将本月各个key的中间值数据,保存为文件
    23                         for(Map.Entry<String,Number> entry:monthData.entrySet()){
    24                             String aimFilePath = FileUtil.getCompleteFilePath(monthFilePath,getModulo(entry.getKey()),FileUtil.txtFileSuffix);
    25                             File aimFile = new File(aimFilePath);
    26                             String contentTxt = entry.getKey() + FILE_CONTENT_SEPARATOR + entry.getValue();//每个key的数据,保存为一行,key和汇总数据,用特定符号分隔
    27                             FileUtil.apppendContentToFileNewLine(aimFile,contentTxt);
    28                         }
    29                     }
    30                 }catch (Exception e){
    31                     LOG.error(e);
    32                 }finally {
    33                     downLatch.countDown();
    34                 }
    35             }
    36         };
    37         return run;
    38     }
    创建月汇总文件线程
     1 void createUnitRemoveFile(List<Unit> units,ExecutorService BOSS_EXEC){
     2     CountDownLatch needRemoveDownLatch = new CountDownLatch(units.size());
     3     for (Unit unit:units){
     4         if(isNeedRemove(unit)){//本周、近两月、近三月、近半年、近一年需要保存阶段去掉汇总数据
     5             String rootFilePath = getPath();
     6             Runnable run = createUnitRemoveFileRun(rootFilePath,unit,needRemoveDownLatch);
     7             BOSS_EXEC.submit(run);
     8          }else{
     9             needRemoveDownLatch.countDown();
    10         }
    11     }
    12     needRemoveDownLatch.await();//阶段去掉数据功能完成后继续向下执行,否则阻塞
    13 }
    创建片阶段去掉汇总文件
     1 Runnable createUnitRemoveFileRun(final String rootFilePath,final Unit unit,final CountDownLatch downLatch){
     2         Runnable run = new Runnable() {
     3             @Override
     4             public void run() {
     5                 try{
     6                     String removeFilePath = FileUtil.getCompleteFilePath(rootFilePath,unit.getName());
     7                     FileUtil.deleteFile(removeFilePath);
     8                     Integer startDay = getMonthFirstDay(unit.getStartDay());
     9                     Integer endDay =  getLastDay(unit.getStartDay());
    10                     List<Integer> takeOutDays = getDays(startDay,endDay);
    11                     Map<String,Number> removeData = new HashMap<>();
    12                     for (Integer day:takeOutDays){
    13                         List<DayData> dayDatas = getDayDataByDBOnDay(day);
    14                         for (DayData dayData:dayDatas){
    15                             String key = dayData.getKey();
    16                             Number temp = removeData.get(key);
    17                             if(temp == null){
    18                                 temp = new Number();
    19                             }
    20                             temp = ArithUtil.collect(temp,dayData.getData());
    21                             removeData.put(key, temp);
    22                         }
    23                     }
    24                     for (Map.Entry<String,Number> entry:removeData.entrySet()){
    25                         String aimFilePath = FileUtil.getCompleteFilePath(removeFilePath,(getModulo(entry.getKey())),FileUtil.txtFileSuffix);
    26                         File aimFile = new File(aimFilePath);
    27                         String contentTxt = entry.getKey() + FILE_CONTENT_SEPARATOR + entry.getValue();
    28                         FileUtil.apppendContentToFileNewLine(aimFile,contentTxt);
    29                     }
    30                 }catch (Exception e){
    31                     LOG.error(e);
    32                 }finally {
    33                     downLatch.countDown();
    34                 }
    35             }
    36         };
    37         return run;
    38     }
    创建片阶段去掉汇总文件线程
     1 void calPreciseAll(Integer today,List<Unit> units,ExecutorService BOSS_EXEC){
     2     List<String> keys = getKeysOnDay(today); //获取今日需要计算的key范围
     3     Map<String,KeyDetail> keyDetails = getKeyDetailOnDay(today);//获取key详情,主要是为了获取key的开始使用日期,使用日期前,key并没有日数据
     4     HashBasedTable<String,String,Number> existMap = getExistUnitData(today);//表中存在的片数据
     5     Map<String,DayData> todayDataMap = getTodayDataMap(bizDate);//今天日收率
     6     Map<Integer,List<String>> keyModuloMap = keyZone(fundIds);//被计算的key取模分片
     7     CountDownLatch downLatch = new CountDownLatch(keyModuloMap.size());//倒数计数器锁
     8     List<Integer> months = DateUtil.getMonths(START_DAY,today);//统计开始日期到现在日期的各个月份数据 START_DAY 产品统计数据开始日期
     9     for(Map.Entry<Integer,List<String>> entry:keyModuloMap.entrySet()){//分片计算
    10         Runnable run = calDataSectionRun(keyDetails,today,downLatch,existMap,entry.getValue(),months,entry.getKey(),todayDataMap,units);
    11         BOSS_EXEC.submit(run);
    12     }
    13     downLatch.await();
    14 }
    根据文件计算片数据方法
     1 Map<Integer,List<Stirng>> keyZone(List<String> keys){
     2         Map<Integer,List<String>> keyModuloMap = new HashMap<>();
     3         for(int i=0;i<Constant.MODULO;i++){
     4             keyModuloMap.put(i,new Vector<String>());
     5         }
     6         for(String key:keys){
     7             int remainder = getModulo(key);
     8             List<String> temp = keyModuloMap.get(remainder);
     9             temp.add(key);
    10         }
    11         return keyModuloMap;
    12     }
    key取模分片方法
     1 Runnable calDataSectionRun(final Map<String,KeyDetail> keyDetails,final Integer today,final CountDownLatch downLatch,
     2                             final HashBasedTable<String,String,Number> existMap,final List<String> keys,final List<Integer> months,
     3                             final Integer remainder,final Map<String, DayData> todayDataMap,final List<Unit> units){
     4         Runnable run = new Runnable() {
     5             @Override
     6             public void run() {
     7                 try{
     8                     HashBasedTable<String,Integer,Number> keyMonthData = HashBasedTable.create();
     9                     HashBasedTable<String,String,Number> keyRemoveData = HashBasedTable.create();
    10                     for (Integer month:months){
    11                         readTxtToTable(month,remainder,keyMonthData);
    12                     }
    13                     for(Unit unit:units){
    14                         if(unit.isNeedRemove()){
    15                             readTxtToTable(unit.getName(),remainder,keyRemoveData);
    16                         }
    17                     }
    18 
    19                     Vector<Section> save = new Vector<>();
    20                     Vector<Section> mod = new Vector<>();
    21                     for (String key:keys){
    22                         Map<Integer,Number> monthMap = keyMonthData.row(key);
    23                         KeyDetail keyDetail = keyDetails.get(key);
    24                         if(keyDetail == null){
    25                             LOG.error("key[{}],日期[{}]没有详情,请检查",key,today);
    26                             continue;
    27                         }
    28                         for(Unit unit:units){
    29                             Number val = new Number();
    30                             Integer keyStartDay = keyDetail.getStartDay();
    31                             Integer monthStart = keyStartDay>unit.getStartDay()?keyStartDay:unit.getStartDay();
    32                             List<Integer> unitMonths = DateUtil.getMonths(monthStart,unitBO.getEndDay());
    33                             for(Integer month:unitMonths){
    34                                 Number monthDouble = monthMap.get(month);
    35                                 val = ArithUtil.collect(val,monthDouble);
    36                             }
    37                             Number removeData = keyRemoveData.get(key,unit.getName());
    38                             val = ArithUtil.remove(val,removeData);
    39                             Number todayDayData = new Number();
    40                             DayData todayDayData = todayDataMap.get(key);
    41                             if(todayDayData != null && todayDayData.getData() != null){
    42                                 todayDayData = todayDayData.getData();
    43                             }else{
    44                                 LOG.error("key[{}],片[{}],今日[{}]的值为null或者0,请检查",fundId,unit.getName(),today);
    45                             }                           
    46                             Seciton section = new Section(key,unitBO.getName(),unit.getIndexDay(),ArithUtil.remove(val,FIX_VAL));
    47                             Boolean flag = existMap.get(key,unitBO.getName());
    48                             if(flag == null || !flag){
    49                                 save.add(section);
    50                             }else{
    51                                 mod.add(section);
    52                             }
    53                         }
    54                     }
    55                     saveBatchManyThread(save,Constant.batchNum,Constant.maxThreadNum);
    56                     updateBatchManyThread(mod,Constant.batchNum,Constant.maxThreadNum);
    57                 }catch (Exception e){
    58                     LOG.error("所有key,模为{}的片数据计算出现异常",remainder,e);
    59                 }finally {
    60                     downLatch.countDown();
    61                 }
    62             }
    63         };
    64         return run;
    65     }
    key分片计算片数据线程
     1 <C> void readTxtToTable(C folder,Integer remainder,HashBasedTable<String,C,Number> table){
     2         if(table == null){
     3             table = HashBasedTable.create();
     4         }
     5         String rootFilePath = getPath();
     6         String aimFilePath = FileUtil.getCompleteFilePath(rootFilePath,folder+"",remainder+"",FileUtil.txtFileSuffix);
     7         List<String> txtContent = FileUtil.readTxtFile(aimFilePath, FileUtil.ENCODE_UTF8);
     8         for (String content:txtContent){
     9             String[] arr = content.split(FILE_CONTENT_SEPARATOR);
    10             String key = arr[0];
    11             Number val = getNumber(arr[1]);//将字符串转化为Number类型
    12             table.put(key,folder,val);
    13         }
    14     }
    读取文件数据

    以上就是基于原始架构,实现的较多数据的汇总分析。这种架构,迟早是要被淘汰的。目前四个月后,就会升级为mongo+spark架构,届时看看新架构实现这个诉求和现在的方案相比,到底哪里方便了许多。

    但是经过这样一步步迭代升级,对大量数据的汇总分析,有一个很好地策略:分而治之,化整为零,逐个击破,确认各自的边界和交互,一步步调试调优独立功能的性能。

    之前的程序跑完这个诉求,最少也需要8个小时,现在基于分而治之方案,第一次执行需要50min,第二次及以后执行,只需30min以内。

     

    本文主要对近期做的一点事情,小结。同时,也方便自己日后回顾。若其他小伙伴有什么文中没出现的更好的解决方案,欢迎留言沟通交流。

  • 相关阅读:
    在ASP.NET MVC中使用DropDownList引用。呵呵。
    获取泛型对象
    Tomcat JVM 初始化加大内存
    Tomcat6.0 连接池的配置
    @ResponseBody与Ajax
    c3po数据库连接池中取出连接
    SpringMVC
    JQuery发送Ajax请求
    Java生成验证码
    Spring初学
  • 原文地址:https://www.cnblogs.com/robinjava77/p/6285747.html
Copyright © 2011-2022 走看看