zoukankan      html  css  js  c++  java
  • 如何快速把hdfs数据动态导入到hive表

    1. hdfs 文件

     
    {"retCode":1,"retMsg":"Success","data":[{"secID":"000001.XSHE","ticker":"000001","secShortName":"深发展A","exchangeCD":"XSHE","tradeDate":"1991-10-21","preClosePrice":24,"actPreClosePrice":24,"openPrice":24,"highestPrice":24.4,"lowestPrice":23.85,"closePrice":23.9,"turnoverVol":355700,"turnoverValue":8582250,"turnoverRate":0.0058,"accumAdjFactor":0.0117201563,"negMarketValue":1462295257.8,"marketValue":2145064267.7,"PB":2.2666,"isOpen":1},{"secID":"000002.XSHE","ticker":"000002","secShortName":"深万科A","exchangeCD":"XSHE","tradeDate":"1991-10-21","preClosePrice":8,"actPreClosePrice":8,"openPrice":8,"highestPrice":8,"lowestPrice":7.7,"closePrice":7.9,"turnoverVol":375000,"turnoverValue":2944200,"turnoverRate":0.0066,"accumAdjFactor":0.0117337592,"negMarketValue":451011000,"marketValue":615927450,"PB":1.0001,"isOpen":1},{"secID":"000004.XSHE","ticker":"000004","secShortName":"深安达A","exchangeCD":"XSHE","tradeDate":"1991-10-21","preClosePrice":7.25,"actPreClosePrice":7.25,"openPrice":7.25,"highestPrice":7.25,"lowestPrice":7.2,"closePrice":7.2,"turnoverVol":92000,"turnoverValue":665125,"turnoverRate":0.0078,"accumAdjFactor":0.2649084628,"negMarketValue":84977100,"marketValue":175500000,"PB":7.4199,"isOpen":1},{"secID":"000005.XSHE","ticker":"000005","secShortName":"深原野A","exchangeCD":"XSHE","tradeDate":"1991-10-21","preClosePrice":6.46,"actPreClosePrice":6.46,"openPrice":6.49,"highestPrice":6.49,"lowestPrice":6.49,"closePrice":6.49,"turnoverVol":94500,"turnoverValue":613305,"turnoverRate":0.0021,"accumAdjFactor":0.1016459912,"negMarketValue":287756865,"marketValue":584100000,"PB":9.1783,"isOpen":1},{"secID":"000009.XSHE","ticker":"000009","secShortName":"深宝安A","exchangeCD":"XSHE","tradeDate":"1991-10-21","preClosePrice":5.75,"actPreClosePrice":5.75,"openPrice":5.7,"highestPrice":5.8,"lowestPrice":5.65,"closePrice":5.75,"turnoverVol":767500,"turnoverValue":4382245,"turnoverRate":0.0084,"accumAdjFactor":0.1026538759,"negMarketValue":524745000,"marketValue":1293922500,"PB":2.4503,"isOpen":1},{"secID":"600601.XSHG","ticker":"600601","secShortName":"延中实业","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":65.7,"actPreClosePrice":65.7,"openPrice":66.4,"highestPrice":66.4,"lowestPrice":66.4,"closePrice":66.4,"turnoverVol":5333,"turnoverValue":354111,"dealAmount":81,"turnoverRate":0.0053,"accumAdjFactor":0.0010592167,"negMarketValue":66400000,"marketValue":66400000,"PB":40.7703,"isOpen":1},{"secID":"600602.XSHG","ticker":"600602","secShortName":"真空电子","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":640.6,"actPreClosePrice":640.6,"openPrice":647,"highestPrice":647,"lowestPrice":647,"closePrice":647,"turnoverVol":2589,"turnoverValue":1675083,"dealAmount":227,"turnoverRate":0.0051,"accumAdjFactor":0.0019640692,"negMarketValue":330552300,"marketValue":1294000000,"PB":287.6707,"isOpen":1},{"secID":"600651.XSHG","ticker":"600651","secShortName":"飞乐音响","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":119.6,"actPreClosePrice":119.6,"openPrice":120.8,"highestPrice":120.8,"lowestPrice":120.8,"closePrice":120.8,"turnoverVol":1102,"turnoverValue":133122,"dealAmount":14,"turnoverRate":0.0022,"accumAdjFactor":0.0008192464,"negMarketValue":60400000,"marketValue":60400000,"PB":39.6397,"isOpen":1},{"secID":"600652.XSHG","ticker":"600652","secShortName":"爱使电子","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":83.2,"actPreClosePrice":83.2,"openPrice":0,"highestPrice":0,"lowestPrice":0,"closePrice":83.2,"turnoverVol":0,"turnoverValue":0,"dealAmount":0,"turnoverRate":0,"accumAdjFactor":0.0006920481,"negMarketValue":22464000,"marketValue":22464000,"PB":33.8019,"isOpen":0},{"secID":"600653.XSHG","ticker":"600653","secShortName":"申华电工","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":103.4,"actPreClosePrice":103.4,"openPrice":104.4,"highestPrice":104.4,"lowestPrice":104.4,"closePrice":104.4,"turnoverVol":240,"turnoverValue":25056,"dealAmount":4,"turnoverRate":0.0005,"accumAdjFactor":0.0009289199,"negMarketValue":52200000,"marketValue":52200000,"PB":97.279,"isOpen":1},{"secID":"600654.XSHG","ticker":"600654","secShortName":"飞乐股份","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":633.2,"actPreClosePrice":633.2,"openPrice":639.5,"highestPrice":639.5,"lowestPrice":639.5,"closePrice":639.5,"turnoverVol":101,"turnoverValue":64590,"dealAmount":26,"turnoverRate":0.0048,"accumAdjFactor":0.000663586,"negMarketValue":13429500,"marketValue":134358950,"PB":282.9834,"isOpen":1},{"secID":"600656.XSHG","ticker":"600656","secShortName":"浙江凤凰","exchangeCD":"XSHG","tradeDate":"1991-10-21","preClosePrice":1242.9,"actPreClosePrice":1242.9,"openPrice":1255.3,"highestPrice":1255.3,"lowestPrice":1255.3,"closePrice":1255.3,"turnoverVol":140,"turnoverValue":175742,"dealAmount":7,"turnoverRate":0.0031,"accumAdjFactor":0.0007136096,"negMarketValue":56502308.3,"marketValue":321798665.6,"PB":-604.4303,"isOpen":1}]}
     
     

    2. 创建 hive 临时表

    CREATE EXTERNAL TABLE if not exists sensitop.equd_json_tmp (
      retCode string,
      retMsg string,
      data array<struct<
    			  secID: string,
    			  tradeDate: date,
    			  ticker: string,
    			  secShortName: string,
    			  exchangeCD: string,
    			  preClosePrice: double,
    			  actPreClosePrice: double,
    			  openPrice: double,
    			  highestPrice: double,
    			  lowestPrice: double,
    			  closePrice: double,
    			  turnoverVol: double,
    			  turnoverValue: double,
    			  dealAmount: int,
    			  turnoverRate: double,
    			  accumAdjFactor: double,
    			  negMarketValue: double,
    			  marketValue: double,
    			  PE: double,
    			  PE1: double,
    			  PB: double,
    			  isOpen: int>>)
    ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
    LOCATION 'hdfs://hdfs1.wdp:8020/sensitop/finance/equd';
     

    3. 创建 hive 表

    需要把上面表里数组里的数据一条一条放入这个表:
    CREATE TABLE if not exists sensitop.equd_h(
                            secID string,
                            ticker string,
                            secShortName string,
                            exchangeCD string,
                            tradeDate date,
                            preClosePrice double,
                            actPreClosePrice double,
                            openPrice double,
                            highestPrice double,
                            lowestPrice double,
                            closePrice double,
                            turnoverVol double,
                            turnoverValue double,
                            dealAmount int,
                            turnoverRate double,
                            accumAdjFactor double,
                            negMarketValue double,
                            marketValue double,
                            PE double,
                            PE1 double,
                            PB double,
                            isOpen int)
    partitioned by (year string)
    ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
     
    然后新建一个最张表
    CREATE TABLE if not exists sensitop.equd(
                            secID string,
                            ticker string,
                            secShortName string,
                            exchangeCD string,
                            tradeDate date,
                            preClosePrice double,
                            actPreClosePrice double,
                            openPrice double,
                            highestPrice double,
                            lowestPrice double,
                            closePrice double,
                            turnoverVol double,
                            turnoverValue double,
                            dealAmount int,
                            turnoverRate double,
                            accumAdjFactor double,
                            negMarketValue double,
                            marketValue double,
                            PE double,
                            PE1 double,
                            PB double,
                            isOpen int)
    partitioned by (year string)
     
    注意:这里的字段顺序和上面临时表的顺序要一致。
     

    4. 用 Partition 更新数据

    insert overwrite table sensitop.equd_tmp
    partition (year='2016')
    select b.dt.secID,
    b.dt.ticker,
    b.dt.secShortName,
    b.dt.exchangeCD,
    b.dt.tradeDate,
    b.dt.preClosePrice,
    b.dt.actPreClosePrice,
    b.dt.openPrice,
    b.dt.highestPrice,
    b.dt.lowestPrice,
    b.dt.closePrice,
    b.dt.turnoverVol,
    b.dt.turnoverValue,
    b.dt.dealAmount,
    b.dt.turnoverRate,
    b.dt.accumAdjFactor,
    b.dt.negMarketValue,
    b.dt.marketValue,
    b.dt.PE,
    b.dt.PE1,
    b.dt.PB,
    b.dt.isOpen
    from sensitop.equd_json_tmp LATERAL VIEW explode(equd_json_tmp.data) b AS dt
    where dt.tradedate >= '2016-01-01' and dt.tradedate <= '2016-12-31';
     
     
    insert overwrite table sensitop.equd
    partition (year='2016')
    select secID,
    ticker,
    secShortName,
    exchangeCD,
    tradeDate,
    preClosePrice,
    actPreClosePrice,
    openPrice,
    highestPrice,
    lowestPrice,
    closePrice,
    turnoverVol,
    turnoverValue,
    dealAmount,
    turnoverRate,
    accumAdjFactor,
    negMarketValue,
    marketValue,
    PE,
    PE1,
    PB,
    isOpen
    from sensitop.equd_tmp dt
    where year = '2016';
     

    5. 用nifi实现动态插入数据

    NewImage
     
    这里有二个分支,左边一个是每天20:00更新当年的partion; 右边一个是更新1990 到 2015 年的数据,而且只需要更新一次。
     
    insert overwrite table sensitop.equd_h
    partition (year='${year}')
    select b.dt.secID,
    b.dt.ticker,
    b.dt.secShortName,
    b.dt.exchangeCD,
    b.dt.tradeDate,
    b.dt.preClosePrice,
    b.dt.actPreClosePrice,
    b.dt.openPrice,
    b.dt.highestPrice,
    b.dt.lowestPrice,
    b.dt.closePrice,
    b.dt.turnoverVol,
    b.dt.turnoverValue,
    b.dt.dealAmount,
    b.dt.turnoverRate,
    b.dt.accumAdjFactor,
    b.dt.negMarketValue,
    b.dt.marketValue,
    b.dt.PE,
    b.dt.PE1,
    b.dt.PB,
    b.dt.isOpen
    from sensitop.equd_json_tmp LATERAL VIEW explode(equd_json_tmp.data) b AS dt
    where dt.tradedate >= '${year}-01-01' and dt.tradedate <= '${year}-12-31'
     
     
    insert overwrite table sensitop.equd
    partition (year='${year}')
    select secID,
    ticker,
    secShortName,
    exchangeCD,
    tradeDate,
    preClosePrice,
    actPreClosePrice,
    openPrice,
    highestPrice,
    lowestPrice,
    closePrice,
    turnoverVol,
    turnoverValue,
    dealAmount,
    turnoverRate,
    accumAdjFactor,
    negMarketValue,
    marketValue,
    PE,
    PE1,
    PB,
    isOpen
    from sensitop.equd_tmp dt
    where year = '${year}' 
     

     

     

    NIFI 中国社区 QQ群:595034369

     

  • 相关阅读:
    hdu 2822 Dogs (BFS+优先队列)
    hdu 2757 Ocean Currents(BFS+DFS)
    hdu2844 Coins(普通的多重背包 + 二进制优化)
    hdu1495 && pku3414
    hdu1054 Strategic Game(树形DP)
    FckEditor V2.6 fckconfig.js中文注释
    数字文本控件
    统计在线用户列表 for .net WebForm
    智能客户端
    模拟Confirm的Web自定义控件
  • 原文地址:https://www.cnblogs.com/fengwenit/p/6022599.html
Copyright © 2011-2022 走看看