zoukankan      html  css  js  c++  java
  • hadoop项目实战--ETL--(三)实现mysql表到HIVE表的全量导入与增量导入

    一 在HIVE中创建ETL数据库

      ->create database etl;

    二 在工程目录下新建MysqlToHive.py 和conf文件夹

      在conf文件夹下新建如下文件,最后的工程目录如下图

      

    三 源码

      Import.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <root>
    	<importtype>
    		<value>add</value>    <!-- 增量导入或者全导入 -->
    	</importtype>
    
    	<task type="all">
    		<table>user_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
    		<table>oder_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
    	</task>
    	
    	<task type="add">
    		<table>user_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
    		<table>oder_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
    	</task>
    	
    </root>
    

      oder_add.xml

    <?xml version="1.0" encoding="UTF-8"?>
    
    <root>
    	<sqoop-shell type="import">
    		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
    		<param key="username">root</param> <!-- 数据库用户名 -->
    		<param key="password">123456</param> <!-- 数据库密码 -->
    		<param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
    		<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
    		<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
    		<param key="hive-partition-value">$dt</param>
    		<param key="hive-import"></param>
    		<param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
    		<param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
    		<param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
    		<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
    		<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    	</sqoop-shell>
    </root>
    

      oder_all.xml

    <?xml version="1.0" encoding="UTF-8"?>
    
    <root>
    	<sqoop-shell type="import">
    		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
    		<param key="username">root</param><!-- 数据库用户名 -->
    		<param key="password">123456</param><!-- 数据库密码 -->
    		<param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
    		<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
    		<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
    		<param key="hive-partition-value">$dt</param>
    		<param key="hive-import"></param>                  
    		<param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
    		<param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
    		<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
    		<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    	</sqoop-shell>
    </root>
    

      user_add.xml

    <?xml version="1.0" encoding="UTF-8"?>
    
    <root>
    	<sqoop-shell type="import">
    		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
    		<param key="username">root</param> <!-- 数据库用户名 -->
    		<param key="password">123456</param> <!-- 数据库密码 -->
    		<param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
    		<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
    		<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
    		<param key="hive-partition-value">$dt</param>
    		<param key="hive-import"></param>
    		<param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
    		<param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
    		<param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
    		<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
    		<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    	</sqoop-shell>
    </root>
    

      user_all.xml

      

    <?xml version="1.0" encoding="UTF-8"?>
    
    <root>
    	<sqoop-shell type="import">
    		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
    		<param key="username">root</param><!-- 数据库用户名 -->
    		<param key="password">123456</param><!-- 数据库密码 -->
    		<param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
    		<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
    		<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
    		<param key="hive-partition-value">$dt</param>
    		<param key="hive-import"></param>                  
    		<param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
    		<param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
    		<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
    		<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
    	</sqoop-shell>
    </root>
    

      MysqlToHive.py

    # _*_ coding:UTF-8 _*_
    '''
    Created on 2016��12��1��
    
    @author: duking
    '''
    import datetime
    import os
    import xml.etree.ElementTree as ET
    import collections
    
    #获取昨天时间
    def getYesterday(): 
        today=datetime.date.today() 
        oneday=datetime.timedelta(days=1) 
        yesterday=today-oneday  
        return yesterday
    
    
    def Resolve_Conf(dt):
        
        #获取当前工程目录
        PROJECT_DIR = os.getcwd()
        #获得配置文件名
        conf_file = PROJECT_DIR + "confImport.xml"
        #解析配置文件
        xml_tree = ET.parse(conf_file)
        
        #提取出本次导入的类型  全导入或者增量导入  通过配置import.xml中的plan标签的value值设定
        import_types = xml_tree.findall('./importtype')
        for import_type in import_types:
            aim_types = import_type.findall('./value')
            for i in range(len(aim_types)):
                aim_type = aim_types[i].text
                
        #获得task元素
        tasks = xml_tree.findall('./task')
        
        #用来保存待执行的sqoop命令的集合
        cmds = []
        
        for task in tasks:
            #获得导入类型,增量导入或者全量导入
            import_type = task.attrib["type"]
            
            #如果task的标签导入类型与设定类型不同,结束本次循环
            if(import_type != aim_type):
                continue
    
            #获得表名集合
            tables = task.findall('./table')
            
            #迭代表名集合,解析表配置文件
            for i in range(len(tables)):
                #表名
                table_name = tables[i].text
                #表配置文件名
                table_conf_file = PROJECT_DIR + "conf\" + table_name + ".xml"
                
                #解析表配置文件
                xmlTree = ET.parse(table_conf_file)
                
                #获取sqoop-shell 节点
                sqoopNodes = xmlTree.findall("./sqoop-shell")
                #获取sqoop 命令类型
                sqoop_cmd_type = sqoopNodes[0].attrib["type"]
                
                #首先组装成sqoop命令头
                command = "sqoop " + sqoop_cmd_type
                    
                #获取
                praNodes = sqoopNodes[0].findall("./param")
                
                #用来保存param的信息的有序字典
                cmap = collections.OrderedDict()            
                #将所有param中的key-value存入字典中
                for i in range(len(praNodes)):
                    #获取key的属性值
                    key = praNodes[i].attrib["key"]
                    #获取param标签中的值
                    value = praNodes[i].text
                    #保存到字典中
                    cmap[key] = value
                 
                #迭代字典将param的信息拼装成字符串
                for key in cmap:
                      
                    value = cmap[key]
                     
                    #如果不是键值对形式的命令 或者值为空,跳出此次循环
                    if(value == None or value == "" or value == " "):
                        value = ""
                        
                    if(key == "hive-partition-value"):
                        value = value.replace('$dt',str(dt))  
                    #合成前一天的时间
                    if(key == "last-value"):
                        value = '"' + str(dt) + " " + value + '"'
                            
                    #拼装为命令
                    command += " --" + key + " " + value + " " 
                        
                #将命令加入至待执行的命令集合
                cmds.append(command)
            
        return cmds   
            
    
    #python 模块的入口:main函数
    if __name__ == '__main__':
        
        dt = getYesterday();
        
        #解析配置文件,生成相应的HQL语句
        cmds = Resolve_Conf(dt)
        
        #迭代集合,执行命令
        for i in range(len(cmds)):
            cmd = cmds[i]
            print cmd
            #执行导入过秤
            os.system(cmd)
    

      

  • 相关阅读:
    Redis缓存穿透和雪崩
    Redis主从复制
    Redis发布订阅
    IO多路复用
    Synchronized解读
    日志导致jvm内存溢出相关问题
    tomcat及springboot实现Filter、Servlet、Listener
    MySQL主从复制针对trigger的特殊处理
    二、变量/常量/数据类型
    Ubuntu21.04 / Linux Mint20.2 安装 TradingView分析软件
  • 原文地址:https://www.cnblogs.com/duking1991/p/6123047.html
Copyright © 2011-2022 走看看