zoukankan      html  css  js  c++  java
  • DataX

    1.什么是DataX

    ​       DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

      https://github.com/kris-2018/DataX

    2. DataX的设计

    为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

                                           

    3.  框架设计

                                     

    DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

    Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

    Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

    Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    4.  运行原理

                                         

    1)  DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

    2)  DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

    3)  切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

    4)  每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

    5)  DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

     5.快速入门

      前置要求
        - Linux
        - JDK(1.8以上,推荐1.8)
        - Python(推荐Python2.6.X)
       安装
        1)将下载好的datax.tar.gz上传到hadoop101的/opt/software
          [kris@hadoop101 software]$ ls
            datax.tar.gz
        2)解压datax.tar.gz到/opt/module
          [kris@hadoop101 software]$ tar -zxvf datax.tar.gz -C /opt/module/
        3)运行自检脚本
          [kris@hadoop101 bin]$ cd /opt/module/datax/bin/
          [kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json

    [kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json 
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    
    2019-07-14 07:51:25.661 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
    2019-07-14 07:51:25.676 [main] INFO  Engine - the machine info  => 
    
            osInfo: Oracle Corporation 1.8 25.144-b01
            jvmInfo:        Linux amd64 2.6.32-642.el6.x86_64
            cpu num:        8
    
            totalPhysicalMemory:    -0.00G
            freePhysicalMemory:     -0.00G
            maxFileDescriptorCount: -1
            currentOpenFileDescriptorCount: -1
    
            GC Names        [PS MarkSweep, PS Scavenge]
    
            MEMORY_NAME                    | allocation_size                | init_size                      
            PS Eden Space                  | 256.00MB                       | 256.00MB                       
            Code Cache                     | 240.00MB                       | 2.44MB                         
            Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
            PS Survivor Space              | 42.50MB                        | 42.50MB                        
            PS Old Gen                     | 683.00MB                       | 683.00MB                       
            Metaspace                      | -0.00MB                        | 0.00MB                         
    
    
    2019-07-14 07:51:25.708 [main] INFO  Engine - 
    {
            "content":[
                    {
                            "reader":{
                                    "name":"streamreader",
                                    "parameter":{
                                            "column":[
                                                    {
                                                            "type":"string",
                                                            "value":"DataX"
                                                    },
                                                    {
                                                            "type":"long",
                                                            "value":19890604
                                                    },
                                                    {
                                                            "type":"date",
                                                            "value":"1989-06-04 00:00:00"
                                                    },
                                                    {
                                                            "type":"bool",
                                                            "value":true
                                                    },
                                                    {
                                                            "type":"bytes",
                                                            "value":"test"
                                                    }
                                            ],
                                            "sliceRecordCount":100000
                                    }
                            },
                            "writer":{
                                    "name":"streamwriter",
                                    "parameter":{
                                            "encoding":"UTF-8",
                                            "print":false
                                    }
                            }
                    }
            ],
            "setting":{
                    "errorLimit":{
                            "percentage":0.02,
                            "record":0
                    },
                    "speed":{
                            "byte":10485760
                    }
            }
    }
    
    
    2019-07-14 07:51:35.892 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
    2019-07-14 07:51:35.896 [job-0] INFO  JobContainer - 
             [total cpu info] => 
                    averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                    -1.00%                         | -1.00%                         | -1.00%
                            
    
             [total gc info] => 
                     NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                     PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                     PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
    
    2019-07-14 07:51:35.897 [job-0] INFO  JobContainer - PerfTrace not enable!
    2019-07-14 07:51:35.898 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.018s |  All Task WaitReaderTime 0.047s | Percentage 100.00%
    2019-07-14 07:51:35.899 [job-0] INFO  JobContainer - 
    任务启动时刻                    : 2019-07-14 07:51:25
    任务结束时刻                    : 2019-07-14 07:51:35
    任务总计耗时                    :                 10s
    任务平均流量                    :          253.91KB/s
    记录写入速度                    :          10000rec/s
    读出记录总数                    :              100000
    读写失败总数                    :                   0
    View Code

    6. 使用案例

    ①从stream流读取数据并打印到控制台

    1)查看配置模板

    [kris@hadoop101 bin]$ python datax.py -r streamreader -w streamwriter
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    Please refer to the streamreader document:
         https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 
    
    Please refer to the streamwriter document:
         https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
     
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
    to run the job.
    
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "streamreader", 
                        "parameter": {
                            "column": [], 
                            "sliceRecordCount": ""
                        }
                    }, 
                    "writer": {
                        "name": "streamwriter", 
                        "parameter": {
                            "encoding": "", 
                            "print": true
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }
    View Code

    ②从MySQL的导入和导出

      ②. 1 读取MySQL中的数据存放到HDFS

        查看官方模板

    [kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    
    Please refer to the mysqlreader document:
         https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 
    
    Please refer to the hdfswriter document:
         https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
     
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
    to run the job.
    
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": [], 
                            "connection": [
                                {
                                    "jdbcUrl": [], 
                                    "table": []
                                }
                            ], 
                            "password": "", 
                            "username": "", 
                            "where": ""
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [], 
                            "compress": "", 
                            "defaultFS": "", 
                            "fieldDelimiter": "", 
                            "fileName": "", 
                            "fileType": "", 
                            "path": "", 
                            "writeMode": ""
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }
    View Code

     编写配置文件并执行:

    [kris@hadoop101 job]$ vim mysql2hdfs 
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": [
                                "id",
                                "name"
                            ], 
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://hadoop101:3306/test"
                                    ], 
                                    "table": [
                                        "stu1"
                                    ]
                                }
                            ], 
                            "username": "root", 
                            "password": "123456"
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [
                                {
                                    "name": "id",
                                    "type": "INT"
                                },
                                {
                                    "name": "name",
                                    "type": "STRING"
                                }
                            ],  
                            "defaultFS": "hdfs://hadoop101:9000", 
                            "fieldDelimiter": "	", 
                            "fileName": "student.txt", 
                            "fileType": "text", 
                            "path": "/", 
                            "writeMode": "append"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": "2"
                }
            }
        }
    }
    View Code
    2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - 
             [total cpu info] => 
                    averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                    -1.00%                         | -1.00%                         | -1.00%
                            
    
             [total gc info] => 
                     NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                     PS MarkSweep         | 1                  | 1                  | 1                  | 0.071s             | 0.071s             | 0.071s             
                     PS Scavenge          | 1                  | 1                  | 1                  | 0.072s             | 0.072s             | 0.072s             
    
    2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - PerfTrace not enable!
    2019-07-14 09:02:55.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
    2019-07-14 09:02:55.927 [job-0] INFO  JobContainer - 
    任务启动时刻                    : 2019-07-14 09:02:43
    任务结束时刻                    : 2019-07-14 09:02:55
    任务总计耗时                    :                 12s
    任务平均流量                    :                3B/s
    记录写入速度                    :              0rec/s
    读出记录总数                    :                   3
    读写失败总数                    :                   0

         结果展示,从Mysql成功写入HDFS中:

    注意:HdfsWriter实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。

    [kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter  
    [kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter     
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    
    Please refer to the mongodbreader document:
         https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 
    
    Please refer to the hdfswriter document:
         https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
     
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
    to run the job.
    
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mongodbreader", 
                        "parameter": {
                            "address": [], 
                            "collectionName": "", 
                            "column": [], 
                            "dbName": "", 
                            "userName": "", 
                            "userPassword": ""
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [], 
                            "compress": "", 
                            "defaultFS": "", 
                            "fieldDelimiter": "", 
                            "fileName": "", 
                            "fileType": "", 
                            "path": "", 
                            "writeMode": ""
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }
    View Code

      ②. 2读取HDFS数据写入MySQL

      将上个案例上传的文件改名

      [kris@hadoop101 datax]$ hadoop fs -mv /student.txt* /student.txt

      查看官方模板

      [kris@hadoop101 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter

      创建配置文件

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "hdfsreader", 
                        "parameter": {
                            "column": ["*"], 
                            "defaultFS": "hdfs://hadoop102:9000", 
                            "encoding": "UTF-8", 
                            "fieldDelimiter": "	", 
                            "fileType": "text", 
                            "path": "/student.txt"
                        }
                    }, 
                    "writer": {
                        "name": "mysqlwriter", 
                        "parameter": {
                            "column": [
                                "id",
                                "name"
                            ], 
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                    "table": ["student2"]
                                }
                            ], 
                            "password": "000000", 
                            "username": "root", 
                            "writeMode": "insert"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    View Code

      在MySQL的datax数据库中创建student2

      mysql> use test;

      mysql> create table stu1(id int,name varchar(20));

     执行任务,查看数据库中已成功写入:

      [kris@hadoop101 datax]$ bin/datax.py job/hdfs2mysql.json

    ③DataX导入导出案例

      ③.1读取MongoDB的数据导入到HDFS

      编写配置文件

    [kris@hadoop101 datax]$ vim job/mongdb2hdfs.json
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mongodbreader", 
                        "parameter": {
                            "address": ["127.0.0.1:27017"], 
                            "collectionName": "atguigu", 
                            "column": [
                                {
                                    "name":"name",
                                    "type":"string"
                                },
                                {
                                    "name":"url",
                                    "type":"string"
                                }
                            ], 
                            "dbName": "test", 
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [
                                {
                                    "name":"name",
                                    "type":"string"
                                },
                                {
                                    "name":"url",
                                    "type":"string"
                                }
                            ], 
                            "defaultFS": "hdfs://hadoop102:9000", 
                            "fieldDelimiter": "	", 
                            "fileName": "mongo.txt", 
                            "fileType": "text", 
                            "path": "/", 
                            "writeMode": "append"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    View Code

    mongodbreader参数解析

      address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】

      userName:MongoDB的用户名。【选填】

      userPassword: MongoDB的密码。【选填】

      collectionName: MonogoDB的集合名。【必填】

      column:MongoDB的文档列名。【必填】

      name:Column的名字。【必填】

      type:Column的类型。【选填】

      splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】

    执行
    [kris@hadoop101 datax]$ bin/datax.py job/mongdb2hdfs.json

      ③.2 读取MongoDB的数据导入MySQL

      在MySQL中创建表 mysql> create table kris(name varchar(20),url varchar(20));

    编写DataX配置文件

      [kris@hadoop101 datax]$ vim job/mongodb2mysql.json

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mongodbreader", 
                        "parameter": {
                            "address": ["127.0.0.1:27017"], 
                            "collectionName": "kris", 
                            "column": [
                                {
                                    "name":"name",
                                    "type":"string"
                                },
                                {
                                    "name":"url",
                                    "type":"string"
                                }
                            ], 
                            "dbName": "test", 
                        }
                    }, 
                    "writer": {
                        "name": "mysqlwriter", 
                        "parameter": {
                            "column": ["*"], 
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://hadoop101:3306/test", 
                                    "table": ["kris"]
                                }
                            ], 
                            "password": "123456", 
                            "username": "root", 
                            "writeMode": "insert"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    View Code

      执行:

    [kris@hadoop101 datax]$ bin/datax.py job/mongodb2mysql.json

         查看结果:

    mysql> select * from kris;

       ③.3 读取MongoDB的数据导入HBase

       vim mongodb2hbase.json

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mongodbreader",
                        "parameter": {
                            "address": ["x.x.x.x:28888"],
                            "userName": "root",
                            "userPassword": "xxxxt",
                            "collectionName": "shop",
                            "column": [
                                {
                                    "name":"_id",
                                    "type":"string"
                                },
                                {
                                    "name":"shopNo",
                                    "type":"string"
                                }
                            ],
                            "dbName": "posB",
                        }
                    },
                    "writer": {
                        "name": "hbase11xwriter",
                        "parameter": {
                            "hbaseConfig": {
                                "hbase.rootdir": "hdfs://hadoop101:9000/hbase",
                                "hbase.cluster.distributed": "true",
                                "hbase.zookeeper.quorum": "hadoop101,hadoop102,hadoop103"
                            },
                            "table": "NS:shop",
                            "mode": "normal",
                            "rowkeyColumn": [
                               {
                                  "index":0,
                                  "type":"string"
                                }
                            ],
                            "column": [
                               {
                                "index":0,
                                "name": "cf1:id",
                                "type": "string"
                              },
                              {
                                "index":1,
                                "name": "cf1:shopNo",
                                "type": "string"
                              }
                            ],
                            "versionColumn":{
                                "index": "-1",
                                "value":"123456789"
                            },
                            "encoding": "utf-8"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    View Code
  • 相关阅读:
    PS总结
    有用的官方API和官网
    获取jQuery DataTables 的checked选中行
    jQuery DataTables 问题:Cannot reinitialise DataTable
    jQuery DataTables 分页
    实现页面跳转和重定向的几种方法
    iframe父页面和子页面高度自适应
    双核浏览器默认选择内核渲染自己开发的网页
    使用母版页的子页面无法显示母版页图片
    gdb安装
  • 原文地址:https://www.cnblogs.com/shengyang17/p/11183107.html
Copyright © 2011-2022 走看看