zoukankan      html  css  js  c++  java
  • python操作hdfs模块上传文件到HDFS

    因为公司需要,需要写一个脚本将Windows server上的部分日志文件同步到HDFS上,每天定时启动脚本上传。
    大体思路是,首先对比Windows server和HDFS上的是否一样,不一样就证明产生了新的日志文件,然后上传。折腾了一天才弄好。。。
    系统: Mac(确切的说是黑苹果,电脑老掉牙了,用起来还是挺卡的,木办法,穷使我坚持住了,哈哈),如果是Windows,就是settings.py的路径区别。
    存在问题:目前,如果是新增目录,暂时无法同步,但是固定的那几个文件夹,新生成日志后是可以同步的,目前需求就这样,所以先用了,有空了修改。

    1. 用到hdfs模块

    pip install hdfs

    2. 首先,修改hosts文件,这步不做是连不上的。

    vi /etc/hosts
    # 将hdfs映射添加进去
    10.211.XXX.XX hdfs01.XXXX.XXX
    10.211.XXX.XX hdfs02.XXXX.XXX
    ......

    3. 代码可以在Windowsserver和linux server跑。
    transferfile.py

    # -*- coding: utf-8 -*-
    import settings
    import os
    import platform
    import logging as lg
    from hdfs.client import Client
    from hdfs.client import _logger
    
    
    # 记录日志(用了hdfs中源码的logger)
    logdir = settings.LOG_DIR
    _logger.setLevel(lg.DEBUG)
    myhandler = lg.FileHandler(logdir, "w")  # 覆盖模式输出日志
    myhandler.setLevel(lg.DEBUG)
    formatter = lg.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    myhandler.setFormatter(formatter)
    _logger.addHandler(myhandler)
    
    
    class Transfer(object):
    
        def __init__(self):
            self.host = settings.HOST
            self.remotepath = settings.REMOTE_PATH
            self.localpath = settings.LOCAL_PATH
            self.rootpath = settings.ROOT_PATH
            self.client = Client(self.host, root=self.rootpath)
    
        def upload_file_windows(self):
            """windowsserver"""
            try:
                base_dir = self.localpath.split('\').pop()  # 要上传的路径的最后一个文件夹
    
                for root, dirs, files in os.walk(self.localpath):
    
                    new_dir = base_dir + root.split(base_dir).pop().replace('\','/')  # 去除本地路径前缀
    
    
                    for file in files:
                        old_path = root + '\' + file  # 原始本地路径文件
                        lpath = new_dir + '/' + file  # 去除本地路径前缀后的文件
    
                        if not self.client.status(self.remotepath + '/' + lpath, strict=False):
    
                            # 第一个参数远程路径,第二个参数本地路径,第三个参数是否覆盖,第四个参数工作线程数
                            self.client.upload(self.remotepath + '/' + lpath, old_path, overwrite=False)
    
            except Exception as e:
                with open("err.log", "a") as f:
                    f.write(str(e))
    
        def upload_file_linux(self, sep):
            """linuxserver"""
            try:
                base_dir = self.localpath.split(sep).pop()  # 要上传的路径的最后一个文件夹
    
                for root, dirs, files in os.walk(self.localpath):
    
                    new_dir = base_dir + root.split(base_dir).pop()  # 去除本地路径前缀
    
                    for file in files:
                        old_path = root + sep + file  # 原始本地路径文件
                        lpath = new_dir + sep + file  # 去除本地路径前缀后的文件
    
                        if not self.client.status(self.remotepath + sep + lpath, strict=False):
    
                            # len(old_path.split("/"))
                            # 第一个参数远程路径,第二个参数本地路径,第三个参数是否覆盖,第四个参数工作线程数
                            self.client.upload(self.remotepath + sep + lpath, old_path, overwrite=False)
            except Exception as e:
                with open("err.log", "a") as f:
                    f.write(str(e))
    
    
    if __name__ == "__main__":
        transfer = Transfer()
        # windows server
        if platform.platform().startswith("Windows"):
            transfer.upload_file_windows()
        else:
            # linux server
            transfer.upload_file_linux('/')
    

    settings.py

    # -*- coding: utf-8 -*-
    
    HOST = "http://IP:PORT"  # 地址
    ROOT_PATH = "/dsmp/johnny"  # 客户端连接的目录
    REMOTE_PATH = "/dsmp/johnny"  # HDFS上的路径
    LOCAL_PATH = "/Users/smallcaff/Desktop/test"  # 本地路径
    LOG_DIR = "/Users/smallcaff/Desktop/test/mylog.log"  # 日志路径
  • 相关阅读:
    HandlerThread
    handler原理
    死锁简析
    Android序列化
    AsyncTask原理
    【java线程池】
    java创建线程的三种方式
    service相关
    【hashMap】详谈
    【activity任务栈】浅析
  • 原文地址:https://www.cnblogs.com/SmallCaff/p/10650699.html
Copyright © 2011-2022 走看看