zoukankan      html  css  js  c++  java
  • 推荐引擎数据导入导出模块的实现

     

    毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步,以及将新产生的推荐结果重新写回数据库,这样的话就与实际应用一致了

     

    在新建了一个PyDev项目后,需要如下操作(拣最主要的写):

     

    模块的环境变量:

    # -*- coding:UTF-8 -*-
    #
    !/usr/bin/python # FileName:pro_env.py

    #*************************************************** # 项目的路径 PROJECT_DIR = "/usr/local/EclipseProjects/MyBI" # 项目配置文件的路径 PROJECT_CONF_DIR = PROJECT_DIR + "/conf/" # 项目第三方库的路径 PROJECT_LIB_DIR = PROJECT_DIR + "/lib" # 项目临时文件的路径 PROJECT_TMP_DIR = PROJECT_DIR + "/temp" #*************************************************** # Hadoop的安装路径 HADOOP_HOME = "/usr/local/hadoop/" # Hadoop的命令路径 HADOOP_PATH = HADOOP_HOME + "bin/" # HIVE的安装路径 HIVE_HOME = "/opt/hive-0.9.0/" # HIVE的命令路径 HIVE_PATH = HIVE_HOME + "bin/" # Sqoop的安装路径 SQOOP_HOME = "/opt/Sqoop/" # Sqoop的命令路径 SQOOP_PATH = SQOOP_HOME + "bin/" #*************************************************** # Java的安装路径 Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"

     

    配置文件:

    导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入

    <?xml version="1.0" encoding="UTF-8"?>
    <root>
        <task type="add">
            <table>ModifyRecords</table>
        </task>
    </root>

    需要对每张表进行更细一步的配置,新建ModifyRecords.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <root>
        <sqoop-shell type="import">
            <param key="connect">jdbc:mysql://localhost:3306/Recommend</param>
            <param key="username">${username}</param>
            <param key="password">${password}</param>
            <param key="target-dir">/user/hadoop/Recommend/$dt</param>
            <param key="query">'select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"$CONDITIONS" and $CONDITIONS'</param>
            <param key="m">1</param>
            <param key="fields-terminated-by">','</param>
        </sqoop-shell>
    </root>

    导出模块的配置文件类似,不贴了

     

    剩下的工作就是解析配置文件,下面是导入模块部分,导出模块也类似,这儿不贴:

    # -*- coding:UTF-8 -*-
    #!/usr/bin/python
    # FileName:sqoop_import.py
    from com.utls.pro_env import PROJECT_CONF_DIR import xml.etree.ElementTree as ET class sqoop_import(object): def __init__(self): pass @staticmethod # 其中dt为昨天的日期,将由调度模块传入 def resolve_conf(dt): # 获得配置文件名 conf_file = PROJECT_CONF_DIR + "Import.xml" # 解析配置文件 xml_tree = ET.parse(conf_file) # 获得task元素 tasks = xml_tree.findall('./task') for task in tasks: # 获得导入类型,增量导入或者全量导入 import_type = task.attrib["type"] # 获得表名集合 tables = task.findall('./table') # 用来保存待执行的Sqoop命令的集合 cmds = [] # 迭代表名集合,解析表配置文件 for i in range(len(tables)): # 表名 table_name = tables[i].text # 表配置文件名 table_conf_file = PROJECT_CONF_DIR + table_name + ".xml" # 解析表配置文件 xmlTree = ET.parse(table_conf_file) # 获取sqoop-shell节点 sqoopNodes = xmlTree.findall("./sqoop-shell") # 获取sqoop-shell节点 sqoop_cmd_type = sqoopNodes[0].attrib["type"] # 获取 praNodes = sqoopNodes[0].findall("./param") # 用来保存param信息的字典 cmap = {} for i in range(len(praNodes)): # 获得key属性的值 key = praNodes[i].attrib["key"] # 获得param标签中间的值 value = praNodes[i].text # 保存到字典中 cmap[key] = value # 首先组装成sqoop命令头 command = "sqoop " + sqoop_cmd_type # 如果为全量导入 if(import_type == "all"): # query的查询条件为<dt import_condition = dt flag = "<" # 如果为增量导入 elif (import_type == "add"): # query的查询条件为=dt import_condition = dt flag = "=" else: raise Exception # #迭代字典将param的信息拼装成字符串 for key in cmap.keys(): value = cmap[key] # 如果不是键值对形式的命令选项 if(value == None or value == "" or value == " "): value = "" # 将query的CONDITIONS替换为查询条件 if(key == "query"): value = value.replace("$CONDITIONS", import_condition) value = value.replace("$flag", flag) # 将导入分区替换为传入的时间 if(key == "target-dir"): value = value.replace("$dt", dt) # 拼装为命令 if key == "fields-terminated-by": command += " --" + key + " " + value else: command += " --" + key + " " + value + "\" + " " # 将命令加入至待执行的命令集合 cmds.append(command) return cmds

     

    拼装出来的命令如下:

    sqoop import --username xxxx
     --target-dir /user/hadoop/Recommend/2015-04-26
     --m 1
     --connect jdbc:mysql://localhost:3306/Recommend
     --query 'select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS'
     --password xxxx
     --fields-terminated-by ','

     

    然后新建一个模块,编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:

    #!/usr/bin/python
    # FileName sqoop.py
    # -*- coding:UTF-8 -*-
    import os class SqoopUtil(object): ''' sqoop operation ''' def __init__(self): pass @staticmethod def execute_shell(shell): print shell os.system(shell)

     

    最后实现程序的入口:

    # -*- coding:UTF-8 -*-
    #!/usr/bin/python
    # FileName:main.py
    
    from com.cal.sqoop_export import sqoop_export
    from com.cal.sqoop_import import sqoop_import
    from com.utls.pro_env import PROJECT_LIB_DIR, PROJECT_DIR
    from com.utls.sqoop import SqoopUtil
    import os
    import time

    # 一共分为5个步骤: # 1.将数据库中新产生的数据导入HDFS # 2.把刚才导入到HDFS中的数据拷贝到相应的位置 # 3.推荐引擎进行新一轮的计算 # 4.把数据库中原来存储的推荐结果表清空 # 5.从HDFS导入新计算出来的结果 def execute(): # 调度模块将昨天的时间传入 now = time.time() n = 1 before = now - n * 24 * 3600 # 可以改变n 的值计算n天前的 dt = time.strftime("%Y-%m-%d", time.localtime(before)) #####Step1##### # 解析配置文件,获得sqoop命令集合 cmds = sqoop_import.resolve_conf(dt) # 迭代集合,执行命令 for i in range(len(cmds)): cmd = cmds[i] # 执行导入过程 # print cmd SqoopUtil.execute_shell(cmd) #####Step2##### shell = "hadoop fs -cp /user/hadoop/" + dt + "/part-m-00000 /user/hadoop/Recommend/InputData/" + dt # print shell SqoopUtil.execute_shell(shell) #####Step3##### shell = "hadoop jar " + PROJECT_LIB_DIR + "/MovieRecommend.jar main.Recommend" # print shell SqoopUtil.execute_shell(shell) #####Step4##### # 执行脚本:清空表格 os.system(PROJECT_DIR + '/lib/export.sh') #####Step5##### # 迭代集合,执行命令 cmds = sqoop_export.resolve_conf() for i in range(len(cmds)): cmd = cmds[i] # 执行导出过程 # print cmd SqoopUtil.execute_shell(cmd) # Python模块的入口:main函数 if __name__ == '__main__': while True: execute() time.sleep(600)

     

    综上所述是整个大模块最核心的部分,剩下的比如shell下执行数据库命令、Hadoop程序的运行就不详细描述了

     

    整个过程即为:用户在web端产生新数据→导入到数据库→通过Sqoop导入HDFS→推荐引擎计算分析得出推荐结果→将推荐结果重新写回数据库

     

  • 相关阅读:
    .net 调用命令行进行解压缩
    《jQuery实战》第1章 引荐JQuery
    将指定URL文件下载到指定服务器
    Oracle学习笔记
    Spring3 AOP的使用
    Field类使用以及getDeclaredFields方法
    spring 3的使用
    Json数据类型
    RotateAnimation详解
    GsonJava对象生成Json串
  • 原文地址:https://www.cnblogs.com/Murcielago/p/4457775.html
Copyright © 2011-2022 走看看