zoukankan      html  css  js  c++  java
  • python操作hdfs

    python操作hdfs

    • 下载hdfs

      pip install hdfs
      
    • 代码示例

      #!/usr/bin/env python
      # -*- coding:utf-8 -*-
      """
      # Author Xu Junkai
      # coding=utf-8
      # @Time    : 2021/1/17 13:46
      # @Site    :
      # @File    : demo1.py
      # @Software: PyCharm
      """
      from hdfs.client import Client
      
      class HdfsWork(object):
          def __init__(self, urls, root, timeout, session=False):
              self.urls = urls
              self.root = root
              self.timeout = timeout
              self.session = session
              self.client = Client(url=self.urls, root = self.root, timeout = self.timeout, session=self.session)
          def ls(self, hdfs_path):
              """
              hdfs目录下文件和文件夹
              :param hdfs_path: hdfs目录
              :return:
              """
              return self.client.list(hdfs_path, status=False)
          def mkdir(self, hdfs_path):
              """
              创建目录
              :param hdfs_path:
              :return:
              """
              # permission 755
              self.client.makedirs(hdfs_path)
      
          def rm(self,hdfs_path):
              """
              删除hdfs文件
              :param hdfs_path:
              :return:
              """
              # recursive boolean 是否递归删除
              self.client.delete(hdfs_path)
          def upload_hdfs(self, local_path, hdfs_path):
              """
              上传文件到hdfs
              :param local_path: 本地路径
              :param hdfs_path: hdfs路径
              :return:
              """
              # cleanup boolean 上传过程中发生错误,删除所有上传的文件
              self.client.upload(hdfs_path,local_path,cleanup=True)
          def download(self,hdfs_path,local_path):
              """
              从hdfs下载文件
              :param hdfs_path: hdfs路径
              :param local_path: 本地路径
              :return:
              """
              # overwrite boolean 覆盖任何现有文件或目录
              state = self.client.download(hdfs_path,local_path,overwrite=True)
              print(state)
          def status(self, hdfs_path):
              """
              获取hdfs下文件或文件夹信息
              :param hdfs_path:hdfs路径
              :return:
              """
              # print(self.client.content(hdfs_path))
              # strict boolean 不存在返回None,存在返回文件信息
              return self.client.status(hdfs_path,strict=False)
          def is_file(self, hdfs_path):
              """
              判断是文件
              :param hdfs_path:hdfs路径
              :return:
              """
              file_status = self.status(hdfs_path)
              if not file_status:
                  return None
              else:
                  if file_status["type"] == "FILE":
                      return True
                  return False
          def is_directory(self, hdfs_path):
              """
              判断是文件夹
              :param hdfs_path:hdfs路径
              :return:
              """
              file_status = self.status(hdfs_path)
              if not file_status:
                  return None
              else:
                  if file_status["type"] == "DIRECTORY":
                      return True
                  return False
          def mv_or_rename(self, hdfs_src_path, hdfs_dst_path):
              """
              移动或修改文件
              :param hdfs_src_path: hdfs源文件路经
              :param hdfs_dst_path: hdfs要修改路径
              :return:
              """
              self.client.rename(hdfs_src_path, hdfs_dst_path)
          def overwrite_hdfs(self, hdfs_path,data):
              """
              覆盖数据写到hdfs文件
              :param hdfs_path: hdfs路径
              :param data: 数据
              :return:
              """
              self.client.write(hdfs_path, data, overwrite=True, append=False, encoding="utf-8")
          def append_hdfs(self, hdfs_path, data):
              """
              追加数据到hdfs文件
              :param hdfs_path: hdfs路径
              :param data: 数据
              :return:
              """
              self.client.write(hdfs_path, data, overwrite=False, append=True, encoding="utf-8")
      if __name__ == '__main__':
          client = HdfsWork("http://10.0.0.134:50070/;http://10.0.0.131:50070/;http://10.0.0.132:50070/", "/", 10000, False)
          # 查看目录下文件和文件夹
          # file_path = client.ls("/")
          # print(file_path)
          # 创建文件目录
          # client.mkdir("/hdfs_test/demo_1/")
          # 将 /hdfs_test 下 demo_1删除
          # client.rm("/hdfs_test/demo_1")
          # 上传文件
          # client.upload_hdfs("./test_report_01.pdf", "/hdfs_test/demo_1/")
          # 下载文件
          # client.download("/hdfs_test/demo_1/test_report_01.pdf", "./hdfs_download/")
          # 获取文件或文件夹信息信息
          # state = client.status("/hdfs_test/demo_1/test_report_01.pdf")
          # print(state)
          # 判断是文件
          # state = client.is_file("/hdfs_test/demo_1/test_report_01.pdf")
          # print(state)
          # 移动或修改文件
          # client.mv_or_rename("/test_report_01.pdf", "/hdfs_test/demo_1/test_report_01.pdf")
          # 覆盖数据写到hdfs文件
          # client.overwrite_hdfs("/a.txt", "this is my wrire text
      ")
          # 追加数据到hdfs文件
          # client.append_hdfs("/a.txt", "this is my wrire text
      ")
      
      
    • 当创建目录报错解决方式

    hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x
    
    解决办法是:在配置文件hdfs-site.xml中加入
    <property>
      <name>dfs.permissions</name>
      <value>false</value>
    </property>
    
  • 相关阅读:
    如何安装mysql
    07 登录接口开发
    06 跨域问题
    05 实体校验
    04 异常处理
    03 整合shiro+jwt 会话共享
    02 统一结果封装
    01 新建SpringBoot项目 整合Mybatis Plus(Spring Boot 前后端分离)
    结合Scikit-learn介绍几种常用的特征选择方法
    Set_ML
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14290061.html
Copyright © 2011-2022 走看看