zoukankan      html  css  js  c++  java
  • Hadoop_11_HDFS的流式 API 操作

      对于MapReduce等框架来说,需要有一套更底层的API来获取某个指定文件中的一部分数据,而不是一整个文件

    因此使用流的方式来操作 HDFS上的文件,可以实现读取指定偏移量范围的数据

    1.客户端测试类代码:

    package cn.bigdata.hdfs;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.junit.Before;
    
    public class HdfsStreamAcess {
        //获取客户端操作hdfs的实例对象
        private FileSystem fs  = null;
        Configuration conf = null;
        @Before
        public void inin() throws IOException, InterruptedException, URISyntaxException{
            conf = new Configuration();
            //拿到一个文件系统操作的客户端实例对象,最后一个参数为用户名
            fs = FileSystem.get(new URI("hdfs://shizhan2:9000"),conf,"root");
        }
    }

     2.流式上传文件:

        //流式上传文件
        @Test
        public void testUploadWithStream() throws IllegalArgumentException, IOException{
            //true:该文件夹存在就覆盖  IOUtils:工具类
            FSDataOutputStream outputstream = fs.create(new Path("/angelababy.love"), true);
            FileInputStream input = new FileInputStream("c:/xxx.txt");
            IOUtils.copy(input, outputstream);
        }

     3.流式下载文件:

        //流式下载文件
        @Test
        public void testDownloadWithStream() throws Exception{
            FSDataInputStream in = fs.open(new Path("/angelababy.love"));
            FileOutputStream out = new FileOutputStream("d:/access_stream.log");
            IOUtils.copy(in, out);
        }

     4.流式读取指定长度的文件:

    //文件的随机读写
        @Test
        public void testRandomAccess() throws Exception{
            FSDataInputStream in = fs.open(new Path("/regist-copy.log"));
            FileOutputStream out = new FileOutputStream("d:/random_stream.log");
            IOUtils.copyLarge(in, out, 1*1024*1024, 1*1024*1024); // 从1M位置开始读,读1M
        }

    hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度,用于上层分布式运算框架并发处理数据

    5.控制台打印HDFS文件内容:

    @Test
        public void testCat() throws Exception{
            FSDataInputStream in = fs.open(new Path("/angelababy.love"));
            IOUtils.copy(in,System.out);
        }

    6.递归列出指定目录下所有子文件夹中的文件:

    @Test
        public void testLs() throws Exception {
            RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
            
            while(listFiles.hasNext()){
                LocatedFileStatus fileStatus = listFiles.next();
                System.out.println("blocksize: " +fileStatus.getBlockSize());
                System.out.println("owner: " +fileStatus.getOwner());
                System.out.println("Replication: " +fileStatus.getReplication());
                System.out.println("Permission: " +fileStatus.getPermission());
                System.out.println("Name: " +fileStatus.getPath().getName());
                System.out.println("------------------");
                BlockLocation[] blockLocations = fileStatus.getBlockLocations();
                for(BlockLocation b:blockLocations){
                    System.out.println("块起始偏移量: " +b.getOffset());
                    System.out.println("块长度:" + b.getLength());
                    //块所在的datanode节点
                    String[] datanodes = b.getHosts();
                    for(String dn:datanodes){
                    System.out.println("datanode:" + dn);
                    }
                }
            }
        }

    7.获取文件块信息

        @Test
        public void testGetFileBlock() throws Exception{
            FileStatus fileStatus = fs.getFileStatus(new Path("/pcre-8.35.tar.gz"));
            BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
            for (BlockLocation bl : blockLocations) {
                System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
        }
  • 相关阅读:
    Stream 和 byte[] 之间的转换
    C# Process类_进程_应用程序域与上下文之间的关系
    C# Process类_进程管理器Demo
    C# attribute_特性
    SqlDataAdapter类
    SqlDataReader类
    SqlCommand类
    SqlConnection类
    DataTable类
    C# 语法技巧_三目运算_switch_case
  • 原文地址:https://www.cnblogs.com/yaboya/p/9197296.html
Copyright © 2011-2022 走看看