zoukankan      html  css  js  c++  java
  • datax二次开发

    从hive抽取数据,写入hbase

    一、datax插件hbase12xwriter开发   

    查看datax源码,发现有hbase094xwriter和hbase11xwriter源码,说明datax支持hbase写入,再查看测试和生产环境使用的hbase版本是:hbase-1.2.0-cdh5.8.4

    自己写一个hbase12xwriter插件包

    开发流程:

    1、搭建项目模块module

    datax-all项目上右击->New->other->Maven->Maven Module选中, Next

    Module Name:hbase12xwriter

    Parent Project:datax-all

    Next 下一步

    Next下一步:

    Finish完成,项目中就多了一个模块hbase12xwriter,并且 datax-all的pom.xml中<modules>结点最后会自动加一行<module>hbase12xwriter</module>

    2、把原来hbase11xwriter包中代码和pom文件拷贝到新建的模块中

    修改拷贝过来的pom.xml

    hbase版本号修改为:1.2.0   hadoop版本号修改为:2.6.0

    3.hbase12xwriter项目右击->build path->configure build path->source添加 Folder 

    src/main/resources

    将hbase11xwriter/src/main/resources中

    plugin_job_template.json

    plugin.json拷贝过来

    4.将src/main/下的assembly文件夹及package.xml拷贝过来,不然打包不成功

    拷贝过来修改里面内容为hbase12xwriter

    5.hbase12xwriter项目右击->run as->Maven build->install

    BUILD SUCCESS就表示编译成功

    得到jar包:hbase12xwriter-0.0.1-SNAPSHOT.jar

    6.拷贝doc/hbase11xwriter.md过来hbase12xwriter修改

    7.在hive/tmp库下建测试表:
    create table tmp.test_hive_to_hbase
    (
    id         int          comment 'ID',
    code    string     comment '订单编码',
    amt      float       comment '订单金额',
    time     string     comment '发货时间'
    )
    comment '订单表'  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE ; 

    插入数据到表:

    在目录下cd /home/zallds/users/wcy

    vi 000000_0

    写入2行数据:

    10001,order-001,189.25,2018-01-09
    10002,order-002,3900.80,2018-01-10

    进入hadoop命令目录:

        cd /data/hadoop-2.6.0-cdh5.8.4/bin

    从本地文件系统中复制单个或多个源路径到hdfs目标文件系统。

        hadoop fs -put /home/zallds/users/wcy/000000_0  /user/hive/warehouse/tmp.db/test_hive_to_hbase

    这个时候在hive里查表:select * from tmp.test_hive_to_hbase发现有数据了

    8.在hbase中创建表:

    create  'test_hbase_write',{NAME=>'cf'}

    create  'temp_pm_price',{NAME=>'cf'}

    9.构造json文件

    {
    "job": {
    "setting": {
    "speed": {
    "channel": 1
    }
    },
    "content": [{
    "reader": {
    "parameter": {
    "fileType": "text",
    "fieldDelimiter": ",",
    "column": [{
    "index": "0",
    "type": "int"
    }, {
    "index": "1",
    "type": "string"
    }, {
    "index": "2",
    "type": "float"
    }, {
    "index": "3",
    "type": "string"
    }],
    "encoding": "UTF-8",
    "path": "/user/hive/warehouse/tmp.db/test_hive_to_hbase",
    "defaultFS": "hdfs://cluster"
    },
    "name": "hdfsreader"
    },
    "writer": {
    "name": "hbase12xwriter",
    "parameter": {
    "hbaseConfig": {
    "hbase.rootdir": "hdfs://cluster/hbase",
    "hbase.cluster.distributed": "true",
    "hbase.zookeeper.quorum": "master"
    },
    "table": "test_hbase_write",
    "mode": "normal",
    "rowkeyColumn": [{
    "index": 0,
    "type": "id"
    }],
    "column": [{
    "index": 0,
    "name": "cf:id",
    "type": "int"
    },
    {
    "index": 1,
    "name": "cf:code",
    "type": "string"
    },
    {
    "index": 2,
    "name": "cf:amt",
    "type": "float"
    },
    {
    "index": 3,
    "name": "cf:time",
    "type": "string"
    }
    ],
    "encoding": "utf-8"
    }
    }
    }]
    }
    }
     
     

    其中hbaseConfig的配置要查看/data/hbase-1.2.0-cdh5.8.4/conf/hbase-site.xml里的配置项,

        "hbase.rootdir": "hdfs://cluster/hbase",
        "hbase.cluster.distributed": "true",
        "hbase.zookeeper.quorum": "master"

    上测试环境查看hbase配置文件:/data/hbase-1.2.0-cdh5.8.4/conf/hbase-site.xml

    在datax/job/下

    vi 290.json

    把json内容粘贴进去

    10.在/home/datax/plugin/writer下创建目录

    [root@master writer]$ mkdir hbase12xwriter

    把hbase12xwriter-0.0.1-SNAPSHOT.jar包上传到hbase12xwriter 目录下

    在hbase12xwriter目录下创建libs目录,将本地项目的jar包全部上传

    把plugin_job_template.json和plugin.json文件拷贝进去

    12.在datax/bin目录下执行任务

    python datax.py ../job/290.json

    没有报错信息,但数据没有进入hbase

    再细看日志,发现报了错:

    原来是hdfsreader读取数据时,用到了datax-common或plugin-unstructured-storage-util包里的工具类,没有定义int类型和IntColumn类型,把hdfsreader中配置字段为int的改成long,hbase12xreader对应的reader还是配置int

    就可以执行成功,并且hbase表中有数据,可都是缺少第一条数据

    从http接口取抽取数据,到hive

    问题梳理:

        1.http请求拿到返回结果,是json串

            输入参数:

                    1)http请求地址:URL   

                    2)http请求参数:parameters  eg:  name='zs' & 

            返回结果:json串,解析成一条条记录,

                将第条记录解析成一个个字段值,

                写入hive表:要知道 库,表,表的字段名

        2.tbl_transfer表结构已经不适合该需求了,离线平台任务配置也不适合该需求了,要重新开发页面来配置?表可否共用呢?扩展字段?

        target_db_type:hdfs  

        target_db_name:tmp

        target_table:tmp_user_http

        query_sql:不用填

        columns:同其他一样,不过列的顺序要按hdfs表中顺序一致,eg:name:name,type:string;name:age,type:int;name:mobile,type:string;name:address,type:string

        需要增加字段:

            1)类型:标识是http接口到hdfs抽取 ;  source_db_type:http

            2)http接口URL地址;

            3)http接口参数

        3.dataxJson如何修改:

            根据抽取类型:http接口  来生成对应的json,源相关的字段用不到,要用到新增加字段httpUrl和params,和列columns来生成json

        4.zeus上任务该如何配呢?

            同其他任务一样配置

    zeus调度任务配置:

    /home/bin/dump 内容:

    #! /usr/bin/env bash
    . /etc/profile
    export DATAX_HOME=/data/datax
    
    if [ $# == 2 ]; then
    today=$2
    else 
    today=`date -d -1days '+%Y-%m-%d'`
    fi
    
    java -jar /home/software/dataxJson_product.jar $1 $today
    if test $? -ne 0
    then 
    exit 11
    fi
    
    
    cd $DATAX_HOME/bin
    python datax.py ../job/$1.json

    /home/bin/hiveF 内容:

    #! /usr/bin/env bash
    . /etc/profile
    command=`java -jar /home/lib/hiveF.jar $*`
    echo "$command"
    hive -e "$command"

     搭建httpreader模块,加入datax-all的module中

    1、开发流程:

    2.部署流程:

    1)在datax/plugin/reader/目录下创建httpreader目录

    httpreader目录下四个文件如下:

    httpreader.jar包deploy拷贝进来,eclipse中libs下所有jar文件拷贝到libs目录,两个.json是模板文件,修改下

    3.开发column列配置

    4.考虑http到hdfs增量抽取,默认load_type为空,即是truncate,可选择update和partUpdate  测试环境需要把ssh2连接的包ganymed-ssh2-build210.jar上传到hdfswriter的libs下

    mysql到hdfs支持增量导入

    datax支持hive增量同步思想:

    1.truncate 全量

    2.复盖:

        全量复盖

        部分分区复盖

    3.更新

        全表更新

        分区更新

    4.追加

        

    分类:
        1.load_type==1 全量同步,无分区;(适用于小表,参数表,全删全插)--->对应我们的truncate
        2.load_type==2 增量或者全量同步合并全量,按天分区;(适用于类似用户访问明细表,新增不修改,直接插入)
        3.load_type==3 增量同步合并全量,无分区(适用于类似用户表,直接插入)--->对应我们的update
        4.load_type==4 增量同步,有分区,抽取数据为目标表多个分区全量数据;(适用于类似支付订单表,按照分区字段覆盖分区插入)
        5.load_type==5 增量同步,有分区;(适用于类似订单表,按创建时间分区,最后修改时间抽取,按照非分区字段覆盖分区插入)
     
     
     
            w_partition_name不为空    '''hive -e "use %s;alter table %s add if not exists partition(%s='%s') location '%s';"''' 
    load_data_cmd = load_data_cmd % (job_info["w_database"],job_info["w_table"],job_info["w_partition_name"],job_info["w_partition_value"],load_path)
    流程:
        1.创建hive分区
            1)计算分区,拿到分区值:分区默认按时间【年月日分区】 date=2018-06-05
            2)判断该分区是否已经存在,存在的先删除
            3)组装拼成sql,执行hive -e sql,创建分区表
         2.   删除创建临时表
            1)if(load_type>1)   即load_type=2,3,4,5时,
                a)先删除临时表:hive -e drop table if exists temp.temp_table;
                b)创建临时表:hive -e  create_sql
            create_sql=use r_database;

                               create table table_name(  

    1. col1 string,  
    2. col2 string,  
    3. col3 string,  
    4. col4 string,  
    5. col5 string)ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'   
    6. WITH SERDEPROPERTIES ( "input.regex" = "正则表达式" ) STORED AS TEXTFILE;  
        3.构建路径
            1)if(load_type==1) ,
                a)系统默认使用时间分区【年月日】,计算分区值,拼路径load_path=hdfs://SAD-HDP-003:9000/user/hive/warehouse/库名.db/表名/分区【date=2018-06-06】
                b)拼装datax.py -p命令
                    datax.py -p '-Dr_host=%s -Dr_port=%s -Dr_username=%s -Dr_password=%s -Dr_database=%s -Dr_table=%s -Dr_where=%s -Dr_sql=%s -Dr_columns=%s
                        -Dr_encoding=%s -Dr_params=%s -Dr_concurrency=%s -Dw_dir=load_path -Dprefix_filename=%s -Dw_encoding=%s -Dw_buffer_size=%s -Dw_del_mode=%s 
                        -Dw_concurrency=%s ' ROOT_PATH /job/js_mysql_to_hdfs_job.xml
            2)if(load_type>1),
                a)根据hive表名构建hdfs临时表路径
                    load_path = "hdfs://nameservice1/user/hive/warehouse/temp.db/" + table_name
                b)拼装datax.py -p命令
                    datax.py -p '-Dr_host=%s -Dr_port=%s -Dr_username=%s -Dr_password=%s -Dr_database=%s -Dr_table=%s -Dr_where=%s -Dr_sql=%s -Dr_columns=%s
                        -Dr_encoding=%s -Dr_params=%s -Dr_concurrency=%s -Dw_dir=load_path -Dprefix_filename=%s -Dw_encoding=%s -Dw_buffer_size=%s -Dw_del_mode=%s 
                        -Dw_concurrency=%s ' ROOT_PATH /job/js_mysql_to_hdfs_job.xml
        4.执行命令,抽取数据 datax etl
            1)替换datax.py -p命令中replace("$START_TIME",date)
            2)替换datax.py -p命令中replace("$YM_TEXT",ym)
            3)替换datax.py -p命令中replace("$LAST_YM_TEXT",last_ym)
            4)替换datax.py -p命令中replace("$END_TIME",date_after)
           然后执行命令
        5.后序处理:
            1)指定表的location
                if(load_type<=1),
                    a)默认分区,/date=年-月-日
                    b)添加一个分区:
                        hive -e use w_database;alter table w_table add if not exists partition(date='w_partition_value') location load_path;
                    c)执行命令
                if(load_type>1),
                    a)loady_type>1,即是2,3,4,5都是增量,
                        hive -e use temp; ALTER table %s  set  location '%s'; ("temp",job_info_map["temp_table"],load_path)
                    b)执行命令
            2)合并处理:merge
                a)if(load_type==2)
                    i)处理columns拼接上date:即id,name,password,date这样的
                    ii)拼装hive_sql:
                        hive -e set set hive.exec.dynamic.partition.mode=nonstrict;
                                    set hive.exec.max.dynamic.partitions.pernode=1000;
                                    INSERT OVERWRITE TABLE  w_database.w_table  PARTITION(w_partition_name)  select colums_str from temp.temp_table
                    iii)执行hive_sql命令
                b)if(load_type==3,4,5) 暂时不考虑
                    
                    
     
     
    mysql->hive
    1.默认writeMode:truncate  全量导入(二次开发:先删,后加,全量)
    2.writeMode:partAppend   分区增量更新
    3.writeMode:append                不分区增量追加(原始,不合并,不更新,只追加)
    4.writeMode:update                不分区增量更新(合并,更新,追加)
  • 相关阅读:
    ABAP 程序中的类 沧海
    ABAP类的方法(转载) 沧海
    More than 100 ABAP Interview Faq's(2) 沧海
    SAP and ABAP Memory总结 沧海
    ABAP Frequently Asked Question 沧海
    ABAP System Reports(Additional functions) 沧海
    ABAP Questions Commonly Asked 1 沧海
    ABAP Tips and Tricks 沧海
    ABAP System Fields 沧海
    ABAP 面试问题及答案(一):数据库更新及更改 SAP Standard (转) 沧海
  • 原文地址:https://www.cnblogs.com/ngy0217/p/10054319.html
Copyright © 2011-2022 走看看