zoukankan      html  css  js  c++  java
  • airbnb 开源reAir 工具 用法及源码解析(一)

    reAir 有批量复制与增量复制功能 今天我们先来看看批量复制功能

    批量复制使用方式:

    cd reair
    ./gradlew shadowjar -p main -x test
    # 如果是本地table-list 一样要加file:/// ; 如果直接写  --table-list ~/reair/local_table_list ,此文件必须在hdfs上!
    hadoop jar main/build/libs/airbnb-reair-main-1.0.0-all.jar com.airbnb.reair.batch.hive.MetastoreReplicationJob --config-files my_config_file.xml --table-list file:///reair/local_table_list
    
    

    1.table_list 内容,用户要复制的库名.表名

    db_name1.table_name1
    db_name1.table_name2
    db_name2.table_name3
    ...
    
    
    1. my_config_file.xml 配置
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <configuration>
      <property>
        <name>airbnb.reair.clusters.src.name</name>
        <value>ns8</value>
        <comment>
          Name of the source cluster. It can be an arbitrary string and is used in
          logs, tags, etc.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.src.metastore.url</name>
        <value>thrift://192.168.200.140:9083</value>
        <comment>Source metastore Thrift URL.</comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.src.hdfs.root</name>
        <value>hdfs://ns8/</value>
        <comment>Source cluster HDFS root. Note trailing slash.</comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.src.hdfs.tmp</name>
        <value>hdfs://ns8/tmp</value>
        <comment>
          Directory for temporary files on the source cluster.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.dest.name</name>
        <value>ns1</value>
        <comment>
          Name of the destination cluster. It can be an arbitrary string and is used in
          logs, tags, etc.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.dest.metastore.url</name>
        <value>thrift://dev04:9083</value>
        <comment>Destination metastore Thrift URL.</comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.dest.hdfs.root</name>
        <value>hdfs://ns1/</value>
        <comment>Destination cluster HDFS root. Note trailing slash.</comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.dest.hdfs.tmp</name>
        <value>hdfs://ns1/tmp</value>
        <comment>
          Directory for temporary files on the source cluster. Table / partition
          data is copied to this location before it is moved to the final location,
          so it should be on the same filesystem as the final location.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.batch.output.dir</name>
        <value>/tmp/replica</value>
        <comment>
          This configuration must be provided. It gives location to store each stage
          MR job output.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.clusters.batch.metastore.blacklist</name>
        <value>testdb:test.*,tmp_.*:.*</value>
        <comment>
          Comma separated regex blacklist. dbname_regex:tablename_regex,...
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.batch.metastore.parallelism</name>
        <value>5</value>
        <comment>
          The parallelism to use for jobs requiring metastore calls. This translates to the number of
          mappers or reducers in the relevant jobs.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.batch.copy.parallelism</name>
        <value>5</value>
        <comment>
          The parallelism to use for jobs that copy files. This translates to the number of reducers
          in the relevant jobs.
        </comment>
      </property>
    
      <property>
        <name>airbnb.reair.batch.overwrite.newer</name>
        <value>true</value>
        <comment>
          Whether the batch job will overwrite newer tables/partitions on the destination. Default is true.
        </comment>
      </property>
     
      <property>
        <name>mapreduce.map.speculative</name>
        <value>false</value>
        <comment>
          Speculative execution is currently not supported for batch replication.
        </comment>
      </property>
    
      <property>
        <name>mapreduce.reduce.speculative</name>
        <value>false</value>
        <comment>
          Speculative execution is currently not supported for batch replication.
        </comment>
      </property>
    
    </configuration>
    

    reAir批量复制步骤,共三个Stage

    Stage 1

    Mapper:
      1. 读取用户配置.xml文件配置(setup方法)
      2. 读取用户配置的table-list内容
        1). 过滤与黑名单匹配表名
        2). 生成TaskEstimate对象,通过各种策略封装了TaskEstimate.TaskType.xxx 的操作
        3). 如果src 是分区表,通过desc metastore thrift 执行 create/alter/noop 相应表
        4). 如果是分区表,会增加Check_Partition 任务(为每个分区创建任务,如dt=20180801等)
        5). 返回List<String> result,由 	 分割
            result结构:TaskType isUpdateMeta isUpdateData srcPath destPath db_name table_name partition_name
            TaskEstimate对象:  TaskType isUpdateMeta isUpdateData srcPath destPath
            HiveObjectSpec对象:db_name table_name partition_name
        6). map 输出结果:<key:result.hashCode(), new Text(result)>
       
      
    对mapper 2-3)的补充说明:
        1). COPY_UNPARTITION_TABLE
        2). NO_OP
        3). COPY_PARTITION_TABLE
            1)). CREATE(如果dest没有对应表); ALTER(如果dest存在相应表(ExistingTable), ExistingTable与我们期待的根据srcTable生成的expectTable不同); NO_OP(默认不操作)
            2)). CHECK_PARTITION(生成分区表所有分区,如dt=20180101,dt=20180102等等)
    Reduce:
        1. 反序列化map的vaule值,Pair<TaskEstimate, HiveObjectSpec>
        2. 如果taskType == CHKER_PARTITION,重新对spec做任务类型校验(重复map端检验逻辑)
        3. 输出的result与map的value相同
           <new Text(result), Expection.toString>
    

    Stage 2

    Mapper:
        1. 反序列化stage1的result,如果estimate.TaskType == COPY_PARTITION_TABLE OR COPY_UNPARTITION_TABLE 并且需要更新 data 就走updateDir()方法,否则NO_OP
        2. updateDir(): 
            1). 清理destPath,并重新创建
            2). 以srcPath递归所有非隐藏文件
        3. 输出:<key:(srcFile.size + fileModificationTime).hashCode(), value:(srcFileName 	 destPath 	 srcFileSize)>
    
    Reduce:
        1. 反序列化value
        2. 进行校验copy(重试三次)
            1). 成功日志: COPIED srcPathFileName DestPath srcFileSize "" currentSystemTime
            2). 失败(或未通过校验)日志:SKIPPED srcPathFileName DestPath srcFileSize Expection.toString currentSystemTime
            
    

    Stage 3

    只有mapper任务
    Mapper:
        1. Input为Stage1的输出(<new Text(result), Expection.toString>)
        2. 判断Stage1的结果反序列化得到Estimate.TaskType,进行src/dest元数据的校验,并根据策略进行相应的 CREATE/ALTER/NOOP 操作
    
    

    官方图参考:

  • 相关阅读:
    发送邮件程序
    T-SQL存储过程、游标
    GPS经纬度换算成XY坐标
    开博了
    你应该知道的 50 个 Python 单行代码
    想提升java知识的同学请进
    adb工具包使用方法
    红米note3刷安卓原生
    hadoop 使用和javaAPI
    django学习——url的name
  • 原文地址:https://www.cnblogs.com/jiangxiaoxian/p/9917790.html
Copyright © 2011-2022 走看看