zoukankan      html  css  js  c++  java
  • datax二次开发

    从hive抽取数据,写入hbase

    一、datax插件hbase12xwriter开发   

    查看datax源码,发现有hbase094xwriter和hbase11xwriter源码,说明datax支持hbase写入,再查看测试和生产环境使用的hbase版本是:hbase-1.2.0-cdh5.8.4

    自己写一个hbase12xwriter插件包

    开发流程:

    1、搭建项目模块module

    datax-all项目上右击->New->other->Maven->Maven Module选中, Next

    Module Name:hbase12xwriter

    Parent Project:datax-all

    Next 下一步

    Next下一步:

    Finish完成,项目中就多了一个模块hbase12xwriter,并且 datax-all的pom.xml中<modules>结点最后会自动加一行<module>hbase12xwriter</module>

    2、把原来hbase11xwriter包中代码和pom文件拷贝到新建的模块中

    修改拷贝过来的pom.xml

    hbase版本号修改为:1.2.0   hadoop版本号修改为:2.6.0

    3.hbase12xwriter项目右击->build path->configure build path->source添加 Folder 

    src/main/resources

    将hbase11xwriter/src/main/resources中

    plugin_job_template.json

    plugin.json拷贝过来

    4.将src/main/下的assembly文件夹及package.xml拷贝过来,不然打包不成功

    拷贝过来修改里面内容为hbase12xwriter

    5.hbase12xwriter项目右击->run as->Maven build->install

    BUILD SUCCESS就表示编译成功

    得到jar包:hbase12xwriter-0.0.1-SNAPSHOT.jar

    6.拷贝doc/hbase11xwriter.md过来hbase12xwriter修改

    7.在hive/tmp库下建测试表:
    create table tmp.test_hive_to_hbase
    (
    id         int          comment 'ID',
    code    string     comment '订单编码',
    amt      float       comment '订单金额',
    time     string     comment '发货时间'
    )
    comment '订单表'  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE ; 

    插入数据到表:

    在目录下cd /home/zallds/users/wcy

    vi 000000_0

    写入2行数据:

    10001,order-001,189.25,2018-01-09
    10002,order-002,3900.80,2018-01-10

    进入hadoop命令目录:

        cd /data/hadoop-2.6.0-cdh5.8.4/bin

    从本地文件系统中复制单个或多个源路径到hdfs目标文件系统。

        hadoop fs -put /home/zallds/users/wcy/000000_0  /user/hive/warehouse/tmp.db/test_hive_to_hbase

    这个时候在hive里查表:select * from tmp.test_hive_to_hbase发现有数据了

    8.在hbase中创建表:

    create  'test_hbase_write',{NAME=>'cf'}

    create  'temp_pm_price',{NAME=>'cf'}

    9.构造json文件

    {
    "job": {
    "setting": {
    "speed": {
    "channel": 1
    }
    },
    "content": [{
    "reader": {
    "parameter": {
    "fileType": "text",
    "fieldDelimiter": ",",
    "column": [{
    "index": "0",
    "type": "int"
    }, {
    "index": "1",
    "type": "string"
    }, {
    "index": "2",
    "type": "float"
    }, {
    "index": "3",
    "type": "string"
    }],
    "encoding": "UTF-8",
    "path": "/user/hive/warehouse/tmp.db/test_hive_to_hbase",
    "defaultFS": "hdfs://cluster"
    },
    "name": "hdfsreader"
    },
    "writer": {
    "name": "hbase12xwriter",
    "parameter": {
    "hbaseConfig": {
    "hbase.rootdir": "hdfs://cluster/hbase",
    "hbase.cluster.distributed": "true",
    "hbase.zookeeper.quorum": "master"
    },
    "table": "test_hbase_write",
    "mode": "normal",
    "rowkeyColumn": [{
    "index": 0,
    "type": "id"
    }],
    "column": [{
    "index": 0,
    "name": "cf:id",
    "type": "int"
    },
    {
    "index": 1,
    "name": "cf:code",
    "type": "string"
    },
    {
    "index": 2,
    "name": "cf:amt",
    "type": "float"
    },
    {
    "index": 3,
    "name": "cf:time",
    "type": "string"
    }
    ],
    "encoding": "utf-8"
    }
    }
    }]
    }
    }
     
     

    其中hbaseConfig的配置要查看/data/hbase-1.2.0-cdh5.8.4/conf/hbase-site.xml里的配置项,

        "hbase.rootdir": "hdfs://cluster/hbase",
        "hbase.cluster.distributed": "true",
        "hbase.zookeeper.quorum": "master"

    上测试环境查看hbase配置文件:/data/hbase-1.2.0-cdh5.8.4/conf/hbase-site.xml

    在datax/job/下

    vi 290.json

    把json内容粘贴进去

    10.在/home/datax/plugin/writer下创建目录

    [root@master writer]$ mkdir hbase12xwriter

    把hbase12xwriter-0.0.1-SNAPSHOT.jar包上传到hbase12xwriter 目录下

    在hbase12xwriter目录下创建libs目录,将本地项目的jar包全部上传

    把plugin_job_template.json和plugin.json文件拷贝进去

    12.在datax/bin目录下执行任务

    python datax.py ../job/290.json

    没有报错信息,但数据没有进入hbase

    再细看日志,发现报了错:

    原来是hdfsreader读取数据时,用到了datax-common或plugin-unstructured-storage-util包里的工具类,没有定义int类型和IntColumn类型,把hdfsreader中配置字段为int的改成long,hbase12xreader对应的reader还是配置int

    就可以执行成功,并且hbase表中有数据,可都是缺少第一条数据

    从http接口取抽取数据,到hive

    问题梳理:

        1.http请求拿到返回结果,是json串

            输入参数:

                    1)http请求地址:URL   

                    2)http请求参数:parameters  eg:  name='zs' & 

            返回结果:json串,解析成一条条记录,

                将第条记录解析成一个个字段值,

                写入hive表:要知道 库,表,表的字段名

        2.tbl_transfer表结构已经不适合该需求了,离线平台任务配置也不适合该需求了,要重新开发页面来配置?表可否共用呢?扩展字段?

        target_db_type:hdfs  

        target_db_name:tmp

        target_table:tmp_user_http

        query_sql:不用填

        columns:同其他一样,不过列的顺序要按hdfs表中顺序一致,eg:name:name,type:string;name:age,type:int;name:mobile,type:string;name:address,type:string

        需要增加字段:

            1)类型:标识是http接口到hdfs抽取 ;  source_db_type:http

            2)http接口URL地址;

            3)http接口参数

        3.dataxJson如何修改:

            根据抽取类型:http接口  来生成对应的json,源相关的字段用不到,要用到新增加字段httpUrl和params,和列columns来生成json

        4.zeus上任务该如何配呢?

            同其他任务一样配置

    zeus调度任务配置:

    /home/bin/dump 内容:

    #! /usr/bin/env bash
    . /etc/profile
    export DATAX_HOME=/data/datax
    
    if [ $# == 2 ]; then
    today=$2
    else 
    today=`date -d -1days '+%Y-%m-%d'`
    fi
    
    java -jar /home/software/dataxJson_product.jar $1 $today
    if test $? -ne 0
    then 
    exit 11
    fi
    
    
    cd $DATAX_HOME/bin
    python datax.py ../job/$1.json

    /home/bin/hiveF 内容:

    #! /usr/bin/env bash
    . /etc/profile
    command=`java -jar /home/lib/hiveF.jar $*`
    echo "$command"
    hive -e "$command"

     搭建httpreader模块,加入datax-all的module中

    1、开发流程:

    2.部署流程:

    1)在datax/plugin/reader/目录下创建httpreader目录

    httpreader目录下四个文件如下:

    httpreader.jar包deploy拷贝进来,eclipse中libs下所有jar文件拷贝到libs目录,两个.json是模板文件,修改下

    3.开发column列配置

    4.考虑http到hdfs增量抽取,默认load_type为空,即是truncate,可选择update和partUpdate  测试环境需要把ssh2连接的包ganymed-ssh2-build210.jar上传到hdfswriter的libs下

    mysql到hdfs支持增量导入

    datax支持hive增量同步思想:

    1.truncate 全量

    2.复盖:

        全量复盖

        部分分区复盖

    3.更新

        全表更新

        分区更新

    4.追加

        

    分类:
        1.load_type==1 全量同步,无分区;(适用于小表,参数表,全删全插)--->对应我们的truncate
        2.load_type==2 增量或者全量同步合并全量,按天分区;(适用于类似用户访问明细表,新增不修改,直接插入)
        3.load_type==3 增量同步合并全量,无分区(适用于类似用户表,直接插入)--->对应我们的update
        4.load_type==4 增量同步,有分区,抽取数据为目标表多个分区全量数据;(适用于类似支付订单表,按照分区字段覆盖分区插入)
        5.load_type==5 增量同步,有分区;(适用于类似订单表,按创建时间分区,最后修改时间抽取,按照非分区字段覆盖分区插入)
     
     
     
            w_partition_name不为空    '''hive -e "use %s;alter table %s add if not exists partition(%s='%s') location '%s';"''' 
    load_data_cmd = load_data_cmd % (job_info["w_database"],job_info["w_table"],job_info["w_partition_name"],job_info["w_partition_value"],load_path)
    流程:
        1.创建hive分区
            1)计算分区,拿到分区值:分区默认按时间【年月日分区】 date=2018-06-05
            2)判断该分区是否已经存在,存在的先删除
            3)组装拼成sql,执行hive -e sql,创建分区表
         2.   删除创建临时表
            1)if(load_type>1)   即load_type=2,3,4,5时,
                a)先删除临时表:hive -e drop table if exists temp.temp_table;
                b)创建临时表:hive -e  create_sql
            create_sql=use r_database;

                               create table table_name(  

    1. col1 string,  
    2. col2 string,  
    3. col3 string,  
    4. col4 string,  
    5. col5 string)ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'   
    6. WITH SERDEPROPERTIES ( "input.regex" = "正则表达式" ) STORED AS TEXTFILE;  
        3.构建路径
            1)if(load_type==1) ,
                a)系统默认使用时间分区【年月日】,计算分区值,拼路径load_path=hdfs://SAD-HDP-003:9000/user/hive/warehouse/库名.db/表名/分区【date=2018-06-06】
                b)拼装datax.py -p命令
                    datax.py -p '-Dr_host=%s -Dr_port=%s -Dr_username=%s -Dr_password=%s -Dr_database=%s -Dr_table=%s -Dr_where=%s -Dr_sql=%s -Dr_columns=%s
                        -Dr_encoding=%s -Dr_params=%s -Dr_concurrency=%s -Dw_dir=load_path -Dprefix_filename=%s -Dw_encoding=%s -Dw_buffer_size=%s -Dw_del_mode=%s 
                        -Dw_concurrency=%s ' ROOT_PATH /job/js_mysql_to_hdfs_job.xml
            2)if(load_type>1),
                a)根据hive表名构建hdfs临时表路径
                    load_path = "hdfs://nameservice1/user/hive/warehouse/temp.db/" + table_name
                b)拼装datax.py -p命令
                    datax.py -p '-Dr_host=%s -Dr_port=%s -Dr_username=%s -Dr_password=%s -Dr_database=%s -Dr_table=%s -Dr_where=%s -Dr_sql=%s -Dr_columns=%s
                        -Dr_encoding=%s -Dr_params=%s -Dr_concurrency=%s -Dw_dir=load_path -Dprefix_filename=%s -Dw_encoding=%s -Dw_buffer_size=%s -Dw_del_mode=%s 
                        -Dw_concurrency=%s ' ROOT_PATH /job/js_mysql_to_hdfs_job.xml
        4.执行命令,抽取数据 datax etl
            1)替换datax.py -p命令中replace("$START_TIME",date)
            2)替换datax.py -p命令中replace("$YM_TEXT",ym)
            3)替换datax.py -p命令中replace("$LAST_YM_TEXT",last_ym)
            4)替换datax.py -p命令中replace("$END_TIME",date_after)
           然后执行命令
        5.后序处理:
            1)指定表的location
                if(load_type<=1),
                    a)默认分区,/date=年-月-日
                    b)添加一个分区:
                        hive -e use w_database;alter table w_table add if not exists partition(date='w_partition_value') location load_path;
                    c)执行命令
                if(load_type>1),
                    a)loady_type>1,即是2,3,4,5都是增量,
                        hive -e use temp; ALTER table %s  set  location '%s'; ("temp",job_info_map["temp_table"],load_path)
                    b)执行命令
            2)合并处理:merge
                a)if(load_type==2)
                    i)处理columns拼接上date:即id,name,password,date这样的
                    ii)拼装hive_sql:
                        hive -e set set hive.exec.dynamic.partition.mode=nonstrict;
                                    set hive.exec.max.dynamic.partitions.pernode=1000;
                                    INSERT OVERWRITE TABLE  w_database.w_table  PARTITION(w_partition_name)  select colums_str from temp.temp_table
                    iii)执行hive_sql命令
                b)if(load_type==3,4,5) 暂时不考虑
                    
                    
     
     
    mysql->hive
    1.默认writeMode:truncate  全量导入(二次开发:先删,后加,全量)
    2.writeMode:partAppend   分区增量更新
    3.writeMode:append                不分区增量追加(原始,不合并,不更新,只追加)
    4.writeMode:update                不分区增量更新(合并,更新,追加)
  • 相关阅读:
    c# 泛型(知识整理)
    [VC++]C\C++中结构体知识点强化
    [VC++]CString转化成char
    [VC++]C++中类的多态与虚函数的使用
    [C#]关于自己编写MesasgeBox
    [C#]给DataGridView里的ComboBoxCol添加SelectIndexChange事件
    [C#]用代码触发一个事件
    [C#]序列化例子
    [VC++]怎么使对话框中的按钮DISABLE和ENABLE
    [VC++]控制台程序窗口隐藏
  • 原文地址:https://www.cnblogs.com/ngy0217/p/10054319.html
Copyright © 2011-2022 走看看