zoukankan      html  css  js  c++  java
  • python操作hadoop HDFS api使用

    doc:http://pyhdfs.readthedocs.io/en/latest/

    pip install hdfs  

    https://hdfscli.readthedocs.io/en/latest/quickstart.html

    此外还有一个库pyhdfs

    https://github.com/jingw/pyhdfs/blob/master/README.rst

    一般也可以直接hadoop HDFS 执行hdfscli command操作

    hdfs库文档入门

    命令行界面

    默认情况下,HdfsCLI带有单个入口点hdfscli,该入口点提供了方便的界面来执行常见操作。它的所有命令都接受一个自 --alias变量(如上所述),该自变量定义了针对哪个集群进行操作。

    下载和上传文件

    HdfsCLI支持从HDFS透明地下载和上传文件和文件夹(我们也可以使用该--threads 选项指定并行度)。

    $ # Write a single file to HDFS.
    $ hdfscli upload --alias=dev weights.json models/
    $ # Read all files inside a folder from HDFS and store them locally.
    $ hdfscli download export/results/ "results-$(date +%F)"
    

    如果读取(或写入)单个文件,则还可以通过将其内容-用作路径参数,将其内容流式传输到标准输出(从标准输入返回)。

    $ # Read a file from HDFS and append its contents to a local log file.
    $ hdfscli download logs/1987-03-23.txt - >>logs
    

    默认情况下,如果尝试写入现有路径(在本地或在HDFS上),HdfsCLI将引发错误。我们可以使用该--force选项强制覆盖路径 

    互动壳

    interactive命令(在未指定任何命令时也使用)将创建一个HDFS客户端,并将其公开在python shell中(如果可用,请使用IPython)。这使得在HDFS上执行文件系统操作并与其数据进行交互变得很方便。有关可用方法的概述,请参见下面的Python绑定

    $ hdfscli --alias=dev
    
    Welcome to the interactive HDFS python shell.
    The HDFS client is available as `CLIENT`.
    
    In [1]: CLIENT.list('data/')
    Out[1]: ['1.json', '2.json']
    
    In [2]: CLIENT.status('data/2.json')
    Out[2]: {
      'accessTime': 1439743128690,
      'blockSize': 134217728,
      'childrenNum': 0,
      'fileId': 16389,
      'group': 'supergroup',
      'length': 2,
      'modificationTime': 1439743129392,
      'owner': 'drwho',
      'pathSuffix': '',
      'permission': '755',
      'replication': 1,
      'storagePolicy': 0,
      'type': 'FILE'
    }
    
    In [3]: CLIENT.delete('data/2.json')
    Out[3]: True
    

    利用python的全部功能,我们可以轻松地执行更复杂的操作,例如重命名与某些模式匹配的文件夹,删除一段时间未访问的文件,查找某个用户拥有的所有路径等。

    更多

    cf. 有关命令和选项的完整列表。hdfscli --help

    Python绑定

    实例化客户端

    获取hdfs.client.Client实例的最简单方法是使用上述的Interactive Shell,在该Shell中客户端将自动可用。要以编程方式实例化客户端,有两种选择:

    第一种是导入客户端类并直接调用其构造函数。这是最直接,最灵活的方法,但是不允许我们重复使用已配置的别名:

    from hdfs import InsecureClient
    client = InsecureClient('http://host:port', user='ann')
    

    第二种方法利用hdfs.config.Config该类加载现有的配置文件(默认与CLI相同)并从现有别名创建客户端:

    from hdfs import Config
    client = Config().get_client('dev')
    

    读写文件

    read()方法提供了类似文件的界面,用于从HDFS读取文件。它必须在一个with块中使用(确保始终正确关闭连接):

    # Loading a file in memory.
    with client.read('features') as reader:
      features = reader.read()
    
    # Directly deserializing a JSON object.
    with client.read('model.json', encoding='utf-8') as reader:
      from json import load
      model = load(reader)
    

    如果chunk_size传递参数,则该方法将返回一个生成器,有时使流文件内容更简单。

    # Stream a file.
    with client.read('features', chunk_size=8096) as reader:
      for chunk in reader:
        pass
    

    同样,如果delimiter传递了一个参数,则该方法将返回定界块的生成器。

    with client.read('samples.csv', encoding='utf-8', delimiter='
    ') as reader:
      for line in reader:
        pass
    

    使用以下write() 方法将文件写入HDFS:该方法返回类似文件的可写对象:

    # Writing part of a file.
    with open('samples') as reader, client.write('samples') as writer:
      for line in reader:
        if line.startswith('-'):
          writer.write(line)
    
    # Writing a serialized JSON object.
    with client.write('model.json', encoding='utf-8') as writer:
      from json import dump
      dump(model, writer)
    

    为了方便起见,还可以将可迭代的data参数直接传递给该方法。

    # This is equivalent to the JSON example above.
    from json import dumps
    client.write('model.json', dumps(model))
    

    探索文件系统

    所有Client子类都公开了各种与HDFS交互的方法。大多数都是在WebHDFS操作之后直接建模的,其中一些在下面的代码段中显示:

    # Retrieving a file or folder content summary.
    content = client.content('dat')
    
    # Listing all files inside a directory.
    fnames = client.list('dat')
    
    # Retrieving a file or folder status.
    status = client.status('dat/features')
    
    # Renaming ("moving") a file.
    client.rename('dat/features', 'features')
    
    # Deleting a file or folder.
    client.delete('dat', recursive=True)
    

    基于这些方法的其他方法可提供更多高级功能:

    # Download a file or folder locally.
    client.download('dat', 'dat', n_threads=5)
    
    # Get all files under a given folder (arbitrary depth).
    import posixpath as psp
    fpaths = [
      psp.join(dpath, fname)
      for dpath, _, fnames in client.walk('predictions')
      for fname in fnames
    ]
    

    有关可用方法的完整列表,请参见API参考

    检查路径是否存在

    上述大多数方法都会HdfsError 在缺少的路径上引发if调用。推荐的检查路径是否存在的方法是使用带有参数content()或 status()方法strict=False(在这种情况下,它们将None在缺少的路径上返回)。

    更多

    请参阅高级用法部分以了解更多信息。

    =========================

    2:Client——创建集群连接

    > from hdfs import *  

    > client = Client("http://s100:50070")  

    其他参数说明:

          classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)

                           url:ip:端口

                           root:制定的hdfs根目录

                           proxy:制定登陆的用户身份

                           timeout:设置的超时时间

    session:连接标识

     client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)  

    >>> client.list("/")  

    [u'home',u'input', u'output', u'tmp']

    3:dir——查看支持的方法

    >dir(client)

    4:status——获取路径的具体信息

    其他参数:status(hdfs_path, strict=True)

                  hdfs_path:就是hdfs路径

                  strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None

    5:list——获取指定路径的子目录信息

    >client.list("/")

    [u'home',u'input', u'output', u'tmp']

    其他参数:list(hdfs_path, status=False)

                 status:为True时,也返回子目录的状态信息,默认为Flase

    6:makedirs——创建目录

    >client.makedirs("/123")

    其他参数:makedirs(hdfs_path, permission=None)

                   permission:设置权限

    >client.makedirs("/test",permission=777)

    7: rename—重命名

    >client.rename("/123","/test")

    8:delete—删除

    >client.delete("/test")

    其他参数:delete(hdfs_path, recursive=False)

                  recursive:删除文件和其子目录,设置为False如果不存在,则会抛出异常,默认为False

    9:upload——上传数据

    >client.upload("/test","F:[PPT]Google Protocol Buffers.pdf");

    其他参数:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None, 

                                  chunk_size=65536,progress=None, cleanup=True, **kwargs)

                  overwrite:是否是覆盖性上传文件

                  n_threads:启动的线程数目

                  temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换

                  chunk_size:文件上传的大小区间

                  progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数

                  cleanup:如果在上传任何文件时发生错误,则删除该文件

    10:download——下载

    >client.download("/test/NOTICE.txt","/home")

    11:read——读取文件

    withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
        print reader.read()

    其他参数:read(*args, **kwds)

                 hdfs_path:hdfs路径

                 offset:设置开始的字节位置

                 length:读取的长度(字节为单位)

                 buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。

                 encoding:制定编码

                 chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象

                 delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。

                 progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。

  • 相关阅读:
    Python实现TCP服务端的并发
    python程序中的线程操作
    jmeter命令行执行脚本_动态参数设置
    App客户端性能测试点总结
    App功能测试点总结
    jmeter中生成UUID作为唯一标识符
    Python Unittest进行接口测试的简单示例
    jmeter接口测试中的用例数据分离
    博客园看板娘的简单添加
    (转)后端服务性能压测实践
  • 原文地址:https://www.cnblogs.com/SunshineKimi/p/12969733.html
Copyright © 2011-2022 走看看