zoukankan      html  css  js  c++  java
  • python操作hadoop HDFS api使用

    doc:http://pyhdfs.readthedocs.io/en/latest/

    pip install hdfs  

    https://hdfscli.readthedocs.io/en/latest/quickstart.html

    此外还有一个库pyhdfs

    https://github.com/jingw/pyhdfs/blob/master/README.rst

    一般也可以直接hadoop HDFS 执行hdfscli command操作

    hdfs库文档入门

    命令行界面

    默认情况下,HdfsCLI带有单个入口点hdfscli,该入口点提供了方便的界面来执行常见操作。它的所有命令都接受一个自 --alias变量(如上所述),该自变量定义了针对哪个集群进行操作。

    下载和上传文件

    HdfsCLI支持从HDFS透明地下载和上传文件和文件夹(我们也可以使用该--threads 选项指定并行度)。

    $ # Write a single file to HDFS.
    $ hdfscli upload --alias=dev weights.json models/
    $ # Read all files inside a folder from HDFS and store them locally.
    $ hdfscli download export/results/ "results-$(date +%F)"
    

    如果读取(或写入)单个文件,则还可以通过将其内容-用作路径参数,将其内容流式传输到标准输出(从标准输入返回)。

    $ # Read a file from HDFS and append its contents to a local log file.
    $ hdfscli download logs/1987-03-23.txt - >>logs
    

    默认情况下,如果尝试写入现有路径(在本地或在HDFS上),HdfsCLI将引发错误。我们可以使用该--force选项强制覆盖路径 

    互动壳

    interactive命令(在未指定任何命令时也使用)将创建一个HDFS客户端,并将其公开在python shell中(如果可用,请使用IPython)。这使得在HDFS上执行文件系统操作并与其数据进行交互变得很方便。有关可用方法的概述,请参见下面的Python绑定

    $ hdfscli --alias=dev
    
    Welcome to the interactive HDFS python shell.
    The HDFS client is available as `CLIENT`.
    
    In [1]: CLIENT.list('data/')
    Out[1]: ['1.json', '2.json']
    
    In [2]: CLIENT.status('data/2.json')
    Out[2]: {
      'accessTime': 1439743128690,
      'blockSize': 134217728,
      'childrenNum': 0,
      'fileId': 16389,
      'group': 'supergroup',
      'length': 2,
      'modificationTime': 1439743129392,
      'owner': 'drwho',
      'pathSuffix': '',
      'permission': '755',
      'replication': 1,
      'storagePolicy': 0,
      'type': 'FILE'
    }
    
    In [3]: CLIENT.delete('data/2.json')
    Out[3]: True
    

    利用python的全部功能,我们可以轻松地执行更复杂的操作,例如重命名与某些模式匹配的文件夹,删除一段时间未访问的文件,查找某个用户拥有的所有路径等。

    更多

    cf. 有关命令和选项的完整列表。hdfscli --help

    Python绑定

    实例化客户端

    获取hdfs.client.Client实例的最简单方法是使用上述的Interactive Shell,在该Shell中客户端将自动可用。要以编程方式实例化客户端,有两种选择:

    第一种是导入客户端类并直接调用其构造函数。这是最直接,最灵活的方法,但是不允许我们重复使用已配置的别名:

    from hdfs import InsecureClient
    client = InsecureClient('http://host:port', user='ann')
    

    第二种方法利用hdfs.config.Config该类加载现有的配置文件(默认与CLI相同)并从现有别名创建客户端:

    from hdfs import Config
    client = Config().get_client('dev')
    

    读写文件

    read()方法提供了类似文件的界面,用于从HDFS读取文件。它必须在一个with块中使用(确保始终正确关闭连接):

    # Loading a file in memory.
    with client.read('features') as reader:
      features = reader.read()
    
    # Directly deserializing a JSON object.
    with client.read('model.json', encoding='utf-8') as reader:
      from json import load
      model = load(reader)
    

    如果chunk_size传递参数,则该方法将返回一个生成器,有时使流文件内容更简单。

    # Stream a file.
    with client.read('features', chunk_size=8096) as reader:
      for chunk in reader:
        pass
    

    同样,如果delimiter传递了一个参数,则该方法将返回定界块的生成器。

    with client.read('samples.csv', encoding='utf-8', delimiter='
    ') as reader:
      for line in reader:
        pass
    

    使用以下write() 方法将文件写入HDFS:该方法返回类似文件的可写对象:

    # Writing part of a file.
    with open('samples') as reader, client.write('samples') as writer:
      for line in reader:
        if line.startswith('-'):
          writer.write(line)
    
    # Writing a serialized JSON object.
    with client.write('model.json', encoding='utf-8') as writer:
      from json import dump
      dump(model, writer)
    

    为了方便起见,还可以将可迭代的data参数直接传递给该方法。

    # This is equivalent to the JSON example above.
    from json import dumps
    client.write('model.json', dumps(model))
    

    探索文件系统

    所有Client子类都公开了各种与HDFS交互的方法。大多数都是在WebHDFS操作之后直接建模的,其中一些在下面的代码段中显示:

    # Retrieving a file or folder content summary.
    content = client.content('dat')
    
    # Listing all files inside a directory.
    fnames = client.list('dat')
    
    # Retrieving a file or folder status.
    status = client.status('dat/features')
    
    # Renaming ("moving") a file.
    client.rename('dat/features', 'features')
    
    # Deleting a file or folder.
    client.delete('dat', recursive=True)
    

    基于这些方法的其他方法可提供更多高级功能:

    # Download a file or folder locally.
    client.download('dat', 'dat', n_threads=5)
    
    # Get all files under a given folder (arbitrary depth).
    import posixpath as psp
    fpaths = [
      psp.join(dpath, fname)
      for dpath, _, fnames in client.walk('predictions')
      for fname in fnames
    ]
    

    有关可用方法的完整列表,请参见API参考

    检查路径是否存在

    上述大多数方法都会HdfsError 在缺少的路径上引发if调用。推荐的检查路径是否存在的方法是使用带有参数content()或 status()方法strict=False(在这种情况下,它们将None在缺少的路径上返回)。

    更多

    请参阅高级用法部分以了解更多信息。

    =========================

    2:Client——创建集群连接

    > from hdfs import *  

    > client = Client("http://s100:50070")  

    其他参数说明:

          classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)

                           url:ip:端口

                           root:制定的hdfs根目录

                           proxy:制定登陆的用户身份

                           timeout:设置的超时时间

    session:连接标识

     client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)  

    >>> client.list("/")  

    [u'home',u'input', u'output', u'tmp']

    3:dir——查看支持的方法

    >dir(client)

    4:status——获取路径的具体信息

    其他参数:status(hdfs_path, strict=True)

                  hdfs_path:就是hdfs路径

                  strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None

    5:list——获取指定路径的子目录信息

    >client.list("/")

    [u'home',u'input', u'output', u'tmp']

    其他参数:list(hdfs_path, status=False)

                 status:为True时,也返回子目录的状态信息,默认为Flase

    6:makedirs——创建目录

    >client.makedirs("/123")

    其他参数:makedirs(hdfs_path, permission=None)

                   permission:设置权限

    >client.makedirs("/test",permission=777)

    7: rename—重命名

    >client.rename("/123","/test")

    8:delete—删除

    >client.delete("/test")

    其他参数:delete(hdfs_path, recursive=False)

                  recursive:删除文件和其子目录,设置为False如果不存在,则会抛出异常,默认为False

    9:upload——上传数据

    >client.upload("/test","F:[PPT]Google Protocol Buffers.pdf");

    其他参数:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None, 

                                  chunk_size=65536,progress=None, cleanup=True, **kwargs)

                  overwrite:是否是覆盖性上传文件

                  n_threads:启动的线程数目

                  temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换

                  chunk_size:文件上传的大小区间

                  progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数

                  cleanup:如果在上传任何文件时发生错误,则删除该文件

    10:download——下载

    >client.download("/test/NOTICE.txt","/home")

    11:read——读取文件

    withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
        print reader.read()

    其他参数:read(*args, **kwds)

                 hdfs_path:hdfs路径

                 offset:设置开始的字节位置

                 length:读取的长度(字节为单位)

                 buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。

                 encoding:制定编码

                 chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象

                 delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。

                 progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。

  • 相关阅读:
    【C++】资源管理
    【Shell脚本】逐行处理文本文件
    【算法题】rand5()产生rand7()
    【Shell脚本】字符串处理
    Apple iOS产品硬件参数. 不及格的程序员
    与iPhone的差距! 不及格的程序员
    iPhone游戏 Mr.Karoshi"过劳死"通关. 不及格的程序员
    XCode V4 发布了, 苹果的却是个变态. 不及格的程序员
    何时readonly 字段不是 readonly 的?结果出呼你想象!!! 不及格的程序员
    object file format unrecognized, invalid, or unsuitable Command 不及格的程序员
  • 原文地址:https://www.cnblogs.com/SunshineKimi/p/12969733.html
Copyright © 2011-2022 走看看