zoukankan      html  css  js  c++  java
  • DataX操作指南

    1.DataX介绍

    DataX

    DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

    Features

    DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

    安装

    Download DataX下载地址

    解压后即可使用,运行脚本如下

    python27 datax.py ..job	est.json

    2.DataX数据同步

    2.1 从MySQL到MySQL

    建表语句

    DROP TABLE IF EXISTS `tb_dmp_requser`;
    CREATE TABLE `tb_dmp_requser` (
      `reqid` varchar(50) NOT NULL COMMENT '活动编号',
      `exetype` varchar(50) DEFAULT NULL COMMENT '执行类型',
      `allnum` varchar(11) DEFAULT NULL COMMENT '全部目标用户数量',
      `exenum` varchar(11) DEFAULT NULL COMMENT '执行的目标用户数据',
      `resv` varchar(50) DEFAULT NULL,
      `createtime` datetime DEFAULT NULL
    ) 

    将dmp数据库的tb_dmp_requser表拷贝到dota2_databank的tb_dmp_requser表

    job_mysql_to_mysql.json如下

    {
        "job": {
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "allnum", "reqid"
                        ],
                        "connection": [{
                            "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/dmp"],
                            "table": ["tb_dmp_requser"]
                        }],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "allnum", "reqid"
                        ],
                                "preSql": [
                            "delete from tb_dmp_requser"
                        ],
                        "connection": [{
                            "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/dota2_databank",
                            "table": ["tb_dmp_requser"]
                        }],
                        "password": "123456",
                        "username": "root",
                        "writeMode": "replace"
                    }
                }
            }],
            "setting": {
                "speed": {
                    "channel": "2"
                }
            }
        }
    }

     2.2 从Oracle到Oracle

    将scott用户下的test表拷贝到test用户下的test表

    建表语句

    drop table TEST;
    
    CREATE TABLE TEST (
    ID NUMBER(32) NULL,
    NAME VARCHAR2(255 BYTE) NULL
    )
    LOGGING
    NOCOMPRESS
    NOCACHE;

    job_oracle_oracle.json

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "oraclereader",
                        "parameter": {
                            "column": ["id","name"],
                            "connection": [
                                {
                                    "jdbcUrl": ["jdbc:oracle:thin:@localhost:1521:ORCL"],
                                    "table": ["test"]
                                }
                            ],
                            "password": "tiger",
                            "username": "scott",
                            "where":"rownum < 1000"
                        }
                    },
                    "writer": {
                        "name": "oraclewriter",
                        "parameter": {
                            "column": ["id","name"],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:ORCL",
                                    "table": ["test"]
                                }
                            ],
                            "password": "test",
                            "username": "test"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 6
                }
            }
        }
    }

    2.3 从HBase到本地

    将HBase的"LXW"表拷贝到本地路径../job/datax_hbase

    建表语句,添加两条数据

    hbase(main):046:0> create 'LXW','CF'
    0 row(s) in 1.2120 seconds
    
    => Hbase::Table - LXW
    hbase(main):047:0> put 'LXW','row1','CF:NAME','lxw'
    0 row(s) in 0.0120 seconds
    
    hbase(main):048:0> put 'LXW','row1','CF:AGE','18'
    0 row(s) in 0.0080 seconds
    
    hbase(main):049:0> put 'LXW','row1','CF:ADDRESS','BeijingYiZhuang'
    0 row(s) in 0.0070 seconds
    
    hbase(main):050:0> put 'LXW','row2','CF:ADDRESS','BeijingYiZhuang2'
    0 row(s) in 0.0060 seconds
    
    hbase(main):051:0> put 'LXW','row2','CF:AGE','18'
    0 row(s) in 0.0050 seconds
    
    hbase(main):052:0> put 'LXW','row2','CF:NAME','lxw2'
    0 row(s) in 0.0040 seconds
    
    hbase(main):053:0> exit

    job_hbase_to_local.json

    hbase高可用集群配置参考https://www.cnblogs.com/Java-Starter/p/10756647.html

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "hbase11xreader", 
                        "parameter": {
                           "hbaseConfig": {
                                "hbase.zookeeper.quorum": "CentOS7Five:2181,CentOS7Six:2181,CentOS7Seven:2181"
                            },
                            "table": "LXW",
                            "encoding": "utf-8",
                            "mode": "normal",
                            "column": [
                            {
                              "name":"rowkey",
                              "type":"string"
                            },
                            {
                              "name":"CF:NAME",
                              "type":"string"
                            },
                            {
                              "name":"CF:AGE",
                              "type":"string"
                            },
                            {
                              "name":"CF:ADDRESS",
                              "type":"string"
                            }
    
                            ], 
                          
                            "range": {
                                "endRowkey": "", 
                                "isBinaryRowkey": false, 
                                "startRowkey": ""
                            }
                      
                        }
                    }, 
                    "writer": {
                        "name": "txtfilewriter", 
                        "parameter": {
                            "dateFormat": "yyyy-MM-dd", 
                            "fieldDelimiter": "	", 
                            "fileName": "LXW", 
                            "path": "../job/datax_hbase", 
                            "writeMode": "truncate"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": 5
                }
            }
        }
    }

    ../job/datax_hbase路径下生成文件LXW__e647d969_d2c6_47ad_9534_15c90d696099

    文件内容如下

    row1    lxw    18    BeijingYiZhuang
    row2    lxw2    18    BeijingYiZhuang2

    2.4 从本地到HBase

    将本地文件导入到HBase的LXW表中

    源数据source.txt

    row3,jjj1,150,BeijingYiZhuang3
    row4,jjj2,150,BeijingYiZhuang4

    job_local_to_hbase.json

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 5
          }
        },
        "content": [
          {
            "reader": {
              "name": "txtfilereader",
              "parameter": {
                "path": "../job/datax_hbase/source.txt",
                "charset": "UTF-8",
                "column": [
                  {
                    "index": 0,
                    "type": "String"
                  },
                  {
                    "index": 1,
                    "type": "string"
                  },
                  {
                    "index": 2,
                    "type": "string"
                  },
                  {
                    "index": 3,
                    "type": "string"
                  }
                ],
                "fieldDelimiter": ","
              }
            },
            "writer": {
              "name": "hbase11xwriter",
              "parameter": {
                "hbaseConfig": {
                  "hbase.zookeeper.quorum": "CentOS7Five:2181,CentOS7Six:2181,CentOS7Seven:2181"
                },
                "table": "LXW",
                "mode": "normal",
                "rowkeyColumn": [
                    {
                      "index":0,
                      "type":"string"
                    }
                ],
                "column": [
                  {
                        "index":1,
                      "name":"CF:NAME",
                      "type":"string"
                    },
                    {
                        "index":2,
                      "name":"CF:AGE",
                      "type":"string"
                    },
                    {
                        "index":3,
                      "name":"CF:ADDRESS",
                      "type":"string"
                    }
                ],
                "versionColumn":{
                  "index": -1,
                  "value":"123456789"
                },
                "encoding": "utf-8"
              }
            }
          }
        ]
      }
    }

    导入过后可以看到,新增的数据

    hbase(main):241:0* get 'LXW','row3'
    COLUMN                    CELL                                                                  
     CF:ADDRESS               timestamp=123456789, value=BeijingYiZhuang3                           
     CF:AGE                   timestamp=123456789, value=150                                        
     CF:NAME                  timestamp=123456789, value=jjj1 

    2.5 从本地到HDFS/Hive

    HDFS导入到本地不支持高可用,所以这里不做实验

    Hive高可用配置参考https://www.cnblogs.com/Java-Starter/p/10756528.html

    将本地数据文件导入到HDFS/Hive,在Hive上建表才可以导入

    因为路径的问题,只能在Linux端操作

    源数据source.txt

    3,1,150,33
    4,2,150,44

    建表语句

     create table datax_test(
      col1 varchar(10),
     col2 varchar(10),
       col3 varchar(10),
      col4 varchar(10)
     )
        ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       STORED AS ORC;

    fileType要orc,text类型必须要压缩,有可能乱码

    job_local_to_hdfs.json

    {
        "setting": {},
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "txtfilereader",
                        "parameter": {
                            "path": ["../job/datax_hbase/source.txt"],
                            "encoding": "UTF-8",
                            "column": [
                                {
                                    "index": 0,
                                    "type": "String"
                                },
                                {
                                    "index": 1,
                                    "type": "String"
                                },
                                {
                                    "index": 2,
                                    "type": "String"
                                },
                                {
                                    "index": 3,
                                    "type": "String"
                                }
                            ],
                            "fieldDelimiter": ","
                        }
                    },
                    "writer": {
                        "name": "hdfswriter",
                        "parameter": {
                    "defaultFS": "hdfs://ns1/",
                             "hadoopConfig":{
                                     "dfs.nameservices": "ns1",
                                     "dfs.ha.namenodes.ns1": "nn1,nn2",
                                     "dfs.namenode.rpc-address.ns1.nn1": "CentOS7One:9000",
                                     "dfs.namenode.rpc-address.ns1.nn2": "CentOS7Two:9000",
                                     "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                             },
                            "fileType": "orc",
                            "path": "/user/hive/warehouse/datax_test",
                            "fileName": "datax_test",
                            "column": [
                                {
                                    "name": "col1",
                                    "type": "VARCHAR"
                                },
                                {
                                    "name": "col2",
                                    "type": "VARCHAR"
                                },
                                {
                                    "name": "col3",
                                    "type": "VARCHAR"
                                },
                                {
                                    "name": "col4",
                                    "type": "VARCHAR"
                                }
                            ],
                            "writeMode": "append",
                            "fieldDelimiter": ",",
                            "compress":"NONE"
                        }
                    }
                }
            ]
        }
    }

     导入完毕,查看hive

    Time taken: 0.105 seconds
    hive> 
        > 
        > 
        > select *from datax_test;
    OK
    3       1       150     33
    4       2       150     44
    Time taken: 0.085 seconds, Fetched: 2 row(s)

    2.6 从txt到oracle

    txt,dat,csv等格式均可,该dat文件16G,一亿八千万条记录。

    建表语句

    CREATE TABLE T_CJYX_HOMECOUNT (
    "ACYC_ID" VARCHAR2(4000 BYTE) NULL ,
    "ADDRESS_ID" VARCHAR2(4000 BYTE) NULL ,
    "ADDRESS_NAME" VARCHAR2(4000 BYTE) NULL ,
    "ADDRESS_LEVEL" VARCHAR2(4000 BYTE) NULL ,
    "CHECK_TARGET_NUM" VARCHAR2(4000 BYTE) NULL ,
    "CHECK_VALUE" VARCHAR2(4000 BYTE) NULL ,
    "TARGET_PHONE" VARCHAR2(4000 BYTE) NULL ,
    "NOTARGET_PHONE" VARCHAR2(4000 BYTE) NULL ,
    "PARENT_ID" VARCHAR2(4000 BYTE) NULL ,
    "BCYC_ID" VARCHAR2(4000 BYTE) NULL 
    )

    job_txt_to_oracle.json文件如下

    {
        "setting": {},
        "job": {
            "setting": {
                "speed": {
                    "channel": 11
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "txtfilereader",
                        "parameter": {
                            "path": ["E:/opt/srcbigdata2/di_00121_20190427.dat"],
                            "encoding": "UTF-8",
                            "nullFormat": "",
                            "column": [
                                {
                                    "index": 0,
                                    "type": "string"
                                },
                                {
                                    "index": 1,
                                    "type": "string"
                                },
                                                            {
                                    "index": 2,
                                    "type": "string"
                                },
                                {
                                    "index": 3,
                                    "type": "string"
                                },
                                                            {
                                    "index": 4,
                                    "type": "string"
                                },
                                {
                                    "index": 5,
                                    "type": "string"
                                },
                                                            {
                                    "index": 6,
                                    "type": "string"
                                },
                                {
                                    "index": 7,
                                    "type": "string"
                                },
                                                            {
                                    "index": 8,
                                    "type": "string"
                                },
                                {
                                    "index": 9,
                                    "type": "string"
                                },
                         
                            ],
                            "fieldDelimiter": "$"
                        }
                    },
                    "writer": {
                        "name": "oraclewriter",
                        "parameter": {
                            "column": ["acyc_id","address_id","address_name","address_level","check_target_num","check_value","target_phone","notarget_phone","parent_id","bcyc_id"],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:ORCL",
                                    "table": ["T_CJYX_HOMECOUNT"]
                                }
                            ],
                            "password": "test",
                            "username": "test"
                        }
                    }
                }
            ]
        }
    }

    脚本

    python27 datax.py ../job/job_txt_to_oracle.json

    效率比oracle自带的sqlldr快很多,只需要117分钟,就导入了一亿八千万数据,sqlldr需要41小时。

    2.7 从txt到txt

    job_txt_to_txt.json如下

    {
        "setting": {},
        "job": {
            "setting": {
                "speed": {
                    "channel": 2
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "txtfilereader",
                        "parameter": {
                            "path": ["../job/data_txt/a.txt"],
                            "encoding": "UTF-8",
                            "column": [
                                {
                                    "index": 0,
                                    "type": "string"
                                },
                                {
                                    "index": 1,
                                    "type": "string"
                                },
                         
                            ],
                            "fieldDelimiter": "$"
                        }
                    },
                    "writer": {
                        "name": "txtfilewriter",
                        "parameter": {
                            "path": "../job/data_txt/",
                            "fileName": "luohw",
                            "writeMode": "truncate",
                            "format": "yyyy-MM-dd"
                        }
                    }
                }
            ]
        }
    }

     导入完毕生成文件如下

  • 相关阅读:
    【洛谷5304】[GXOI/ZOI2019] 旅行者(二进制分组+最短路)
    【LOJ6485】LJJ 学二项式定理(单位根反演)
    【CF932E】Team Work(第二类斯特林数简单题)
    【CF960G】Bandit Blues(第一类斯特林数)
    【洛谷4689】[Ynoi2016] 这是我自己的发明(莫队)
    【洛谷5355】[Ynoi2017] 由乃的玉米田(莫队+bitset)
    【洛谷5268】[SNOI2017] 一个简单的询问(莫队)
    【洛谷4688】[Ynoi2016] 掉进兔子洞(莫队+bitset)
    【洛谷3653】小清新数学题(数论)
    【洛谷6626】[省选联考 2020 B 卷] 消息传递(点分治基础题)
  • 原文地址:https://www.cnblogs.com/Java-Starter/p/10869324.html
Copyright © 2011-2022 走看看