zoukankan      html  css  js  c++  java
  • Spark菜鸟学习营Day2 分布式系统需求分析

    Spark菜鸟学习营Day2

    分布式系统需求分析

    本分析主要针对从原有代码向Spark的迁移。要注意的是Spark和传统开发有着截然不同的思考思路,所以我们需要首先对原有代码进行需求分析,形成改造思路后,再着手开发。
    对于输入和输出,请注意,指的是以程序为边界的输入和输出情况。

    主要迁移点:

    A:批量数据清理

    • 重点:分析要清理的表在哪里

      • A1.参数表:存放Oracle、Redis。清理Oracle就可以,Redis会同步清理
        • 表一般是以par_开头
      • A2.输入数据表(由数据接收或者其他渠道导入):存放Oracle、HBase,两边都要清理。
        • 表一般是以_temp结尾
      • A3.中间数据表(仅拆分内部使用):存放RDD,需要清理RDD。
        • 表仅被名称中含有_split_字样的程序调用
      • A4.输出数据表(非拆分模块使用):存放Oracle,可能存放HBase,两边都要清理。
        • 表被其他程序调用
    • 实际情况下表的用途会有组合的情况

      • A3+A4.中间数据表 and 输出数据表
    • 输入:可能有,需分析,比如删除条件有表关联情况

    • 输出:无

    B:批量数据转换

    • 特征:insert ... select 语句
      这是最接近标准化的分布式处理,可以使用Dataframe或RDD编程来开发。
    • 开发方法:
      • B1.Dataframe
        • 所有输入表都是参数表、输入数据表、中间表,并且SQL语句支持(不包含not exists等特殊语法),使用Dataframe编程
      • B2. RDD
        • 不满足B1条件
    • 输入:肯定有
    • 输出:肯定有

    C:单行循环转换

    • 特征:pl/sql的游标操作,包括for语法的游标操作。

    C1. 游标转RDD

    将游标逻辑转为RDD的操作。

    • 输入:肯定有
    • 输出:无

    C2. 单行数据过滤

    一般语句中有continue或者goto语句,指对单行数据进行判断,满足条件就处理,否则不处理
    这里可能会出现对Oracle数据的判断,需提前把Oracle数据预先缓存出来,逻辑中访问缓存下来的数据,避免对于数据库的大量连接。

    • 输入:无
    • 输出:无

    C3. 重复数据过滤

    相比单行过滤更加复杂,如果与已处理数据不重复才会处理

    • 输入:无
    • 输出:无

    C4.单行数据清理

    会根据单行数据的条件执行数据清理操作

    • 输入:无
    • 输出:无

    C5.单行数据输出

    一般使用map或mapPartitions算子。
    这部分设计比较难,有几个设计点:

    • 如果有多个输出,需要进行多次对的map。
    • 如果多次输出有公共数据,需要额外增加一次map来处理公共数据。
    • 输入:无
    • 输出:肯定有

    D.优化处理

    不是直接从原有代码转化,主要从性能角度出发来添加,包括:

    • D1.缓存Oracle数据
      • 输入:肯定有
      • 输出:无
    • D2.缓存Redis数据
      • 输入:肯定有
      • 输出:无

    分析样例1

    • 步骤1:清理中间表+结果数据表(A3+A4)
    • 输入:无
    • 输出:无
        DELETE out_trd_qtsl t WHERE t.rq = v_last_date;
    
    • 步骤2:输出数据表,清理Oracle(A4)
    • 输入:无
    • 输出:无
      DELETE FROM out_trd_qtsl_sub t WHERE t.rq = p_i_date;
    
    • 步骤3:输出数据表,清理Oracle(A4)
    • 输入:无
    • 输出:无
      DELETE FROM out_trd_qtsl_his t WHERE t.rq = p_i_date;
    
    • 步骤4:使用Dataframe的select语句来进行处理(B1)
    • 输入:qtsl_temp:Dataframe ; par_fund_partner:DataFrame
    • 输出:无
        INSERT INTO out_trd_qtsl
          (scdm,
           hydm,
           ...
                SELECT scdm,
                 hydm,
                 ...
             FROM qtsl_temp a
           WHERE a.rq = v_last_date
             AND a.zqzh IN (SELECT partner_code
                              FROM par_fund_partner
                             WHERE market_code = v_scdm -- 上海市场
                               AND sub_partner_code = '000000' --不含子股东代码
                               AND v_last_date BETWEEN inure_begin_date AND
                                   inure_end_date);
    
    • 步骤5:游标转RDD(C1)
    • 输入:qtsl_temp:RDD ; par_sys_fill_partner:RDD
    • 输出:无
     SELECT nvl(a.scdm, '') scdm, --市场代码
               nvl(a.hydm, '') hydm, --结算参与人的清算编号
               nvl(a.sjlx, '') sjlx, --数据类型
               ...
          FROM qtsl_temp a
         WHERE a.rq = v_last_date
           AND (a.zqzh IN (SELECT t.partner_code FROM par_sys_fill_partner t) OR
               a.zqzh IS NULL OR a.zqzh = '0')
    
    • 步骤6:缓存Oracle数据(D1)
    • 输入:out_trd_qtsl_his:Oracle
    • 输出:无
          str := 'select count(1) from out_trd_qtsl_his t where ';
          IF r_qtsl_sub.scdm IS NOT NULL THEN
            str := str || 't.SCDM = ''' || r_qtsl_sub.scdm || ''' and '; --市场代码
          END IF;
          ...
          EXECUTE IMMEDIATE str
            INTO v_count;
    
    • 步骤7:第一次map操作,根据Oracle数据进行过滤,并生成补录编号(C2+C5)
    • 输入:无
    • 输出:无
     OPEN c_qtsl_sub;
      LOOP
        <<error_row>>
        FETCH c_qtsl_sub
          INTO r_qtsl_sub;
        EXIT WHEN c_qtsl_sub%NOTFOUND;
        ...
         --进行数据过滤
        IF v_count = 0 THEN
            --生成补录编号
            SELECT lpad(to_char(seq_filldata_no.NEXTVAL), 15, '0')
              INTO v_seq
              FROM dual;
        ...
        END IF;
          END LOOP;
      CLOSE c_qtsl_sub;
    
    • 步骤8:第二次map操作,输出数据(C5)
    • 输入:无
    • 输出:out_trd_qtsl_his:RDD
              INSERT INTO out_trd_qtsl_his
                (scdm, --市场代码
                 hydm, --结算参与人的清算编号
                ...
                 seq_no, --补录编号
                 sub_no --内部顺序号
                 )
              VALUES
                (nvl(r_qtsl_sub.scdm, ''), --市场代码
                 nvl(r_qtsl_sub.hydm, ''), --结算参与人的清算编号
                ...
                 v_seq,
                 '0');
    
    • 步骤9:第三次map操作,输出数据(C5)
    • 输入:无
    • 输出:out_trd_qtsl_sub:RDD
    INSERT INTO out_trd_qtsl_sub
                (scdm, --市场代码
                 hydm, --结算参与人的清算编号
                ...
                 seq_no, --补录编号
                 sub_no, --内部顺序号
                 sub_no_pre --父序号
                 )
              VALUES
                (nvl(r_qtsl_sub.scdm, ''), --市场代码
                 nvl(r_qtsl_sub.hydm, ''), --结算参与人的清算编号
                 ...
                 v_seq,
                 '1',
                 '0');
    

    分析样例2

    • 步骤1:清理中间+输出表(A3+A4)
    • 输入:无
    • 输出:无
          DELETE out_trd_bloomberg t0
           WHERE t0.data_date BETWEEN v_last_date AND p_i_date;
    
    • 步骤2: 数据转换(B1)
    • 输入:bloomberg_temp:Dataframe
    • 输出:无
        BEGIN
          SELECT COUNT(1)
            INTO v_count2
            FROM bloomberg_temp t
           WHERE t.data_date BETWEEN v_last_date AND p_i_date;
        EXCEPTION
          WHEN OTHERS THEN
            v_count2 := 0;
        END;
    
    • 步骤3:游标转RDD(C1)
    • 输入:bloomberg_temp:RDD ; par_sys_stock_bmtx:RDD ; par_sys_coin:RDD ;par_exchange_coin_trans:RDD
    • 输出:无
    SELECT t2.security_id security_id,
                   t1.price_date price_date,
                  ...
                   decode(v1.to_coin, null, t1.coin, v1.to_coin) coin, --t1.coin,
                   round(decode(v1.rate, null, t1.zspj, t1.zspj * v1.rate), 6) zspj, --t1.zspj,
                   round(decode(v1.rate, null, t1.jkpj, t1.jkpj * v1.rate), 6) jkpj, --t1.jkpj,
                  ...
              FROM bloomberg_temp t1,
                   par_sys_stock_bmtx t2,
                   (select t4.coin_name from_coin,
                           t5.coin_name to_coin,
                           t3.rate,
                           t3.inure_begin_date,
                           t4.inure_end_date
                      from par_exchange_coin_trans t3,
                           par_sys_coin            t4,
                           par_sys_coin            t5
                     where t3.from_coin = t4.coin_code
                       and t3.to_coin = t5.coin_code
                       and p_i_date BETWEEN t3.inure_begin_date AND
                           t3.inure_end_date
                       and p_i_date BETWEEN t4.inure_begin_date AND
                           t4.inure_end_date
                       and p_i_date BETWEEN t5.inure_begin_date AND
                           t5.inure_end_date) v1
             WHERE t1.data_date BETWEEN v_last_date AND p_i_date
               AND t1.stock_code = t2.bm_code
               AND t2.bm_type IN ('1','2','10','11') --ISIN code,RIC,CUSIP
               AND p_i_date BETWEEN t2.inure_begin_date AND t2.inure_end_date
               AND t1.coin = v1.from_coin(+)
               AND substr(t2.security_id, 3, 3) <> '056'
    
    • 步骤4:单行数据过滤(C2)
    • 输入:无
    • 输出:无
     IF rec.stock_kind = '01' THEN
              v_count := 0;
              SELECT COUNT(1)
                INTO v_count
                FROM par_sys_stock t6,
                     par_sys_coin t7
               WHERE rec.security_id = t6.security_id
                 AND p_i_date BETWEEN t6.inure_begin_date AND t6.inure_end_date
                 AND rec.coin = t7.coin_name
                 AND t7.coin_code = t6.coin_code;
               IF v_count = 0 THEN
                 continue;
               END IF;
            ...
            ELSE
              NULL;
            END IF;
    
    • 步骤5:重复数据过滤(C3)
    • 输入:无
    • 输出:无
    v_count := 0;
            SELECT COUNT(1)
              INTO v_count
              FROM out_trd_bloomberg t
              WHERE t.coin = rec.coin
                AND t.security_id = rec.security_id
                AND t.price_date = rec.price_date
                AND t.data_date = rec.data_date
                AND t.country = rec.country
                AND t.market = rec.market;
            IF v_count > 0 THEN
                   ...
                  continue;
            END IF;
    
    • 步骤6:单行数据输出(C5)
    • 输入:无
    • 输出:out_trd_bloomberg:RDD
    INSERT INTO out_trd_bloomberg
              ( SECURITY_ID,
                PRICE_DATE,
                COUNTRY,
                ...
              ) VALUES
              (rec.security_id,
              rec.price_date,
              rec.country,
              ...
              );
    
  • 相关阅读:
    手把手教你如何逐步安装OpenStack
    掌握OpenStack部署的最佳实践 打破部署失败的魔咒
    大数据服务大比拼:AWS VS. AzureVS.谷歌
    fullcalender
    也谈---基于 HTTP 长连接的“服务(转载)
    调用页面脚本
    mssql 低版本数据库 使用高版本的命令
    行变列 pivot
    HighCharts -在包含容器尺寸发生变化时重新渲染
    ES6 import export
  • 原文地址:https://www.cnblogs.com/dt-zhw/p/5837277.html
Copyright © 2011-2022 走看看