zoukankan      html  css  js  c++  java
  • [HDFS_4] HDFS 的 Java 应用开发


     0. 说明

       在 IDEA下 进行 HDFS 的 Java 应用开发

      通过编写代码实现对 HDFS 的增删改查操作


    1. 流程

      1.1 在项目下新建 Moudle

      略

      1.2 为 Moudle 添加 Maven 框架支持

      略

      1.3 添加 Maven 依赖

        <dependencies>
            <!-- Hadoop Client依赖 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>
    
            <!-- 单元测试依赖 -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
    
        </dependencies>

      1.4 将 Hadoop/etc/ha 目录下的 [core-site.xml] [hdfs-site.xml] [log4j.properties] 存入 resources 中

      


    2. 代码编写

    package hadoop.hdfs;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.junit.Test;
    
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    
    /**
     * IDEA 下测试 HDFS 的增删改查
     */
    public class TestHDFS {
    
        // 1. 测试读取
        @Test
        public void testRead() throws IOException {
    
            // 初始化配置文件
            Configuration conf = new Configuration();
    
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);
    
            // 初始化路径
            Path p = new Path("/a.txt");
    
            // 通过文件系统获取输入流
            // FSDataInputStream 是 inputStream 的装饰流,可以通过普通流方式操纵 fis
            FSDataInputStream fis = fs.open(p);
    
            int len = 0;
            byte[] buf = new byte[1024];
    
            while ((len = fis.read(buf)) != -1) {
                System.out.println(new String(buf, 0, len));
            }
            fis.close();
        }
    
        // 2. 测试读取并通过 IOUtils 拷贝文件到本地
        @Test
        public void testRead2() throws Exception {
    
            // 初始化配置文件
            Configuration conf = new Configuration();
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);
    
            // 初始化路径
            Path p = new Path("/a.txt");
    
            // 通过文件系统获取输入流
            // FSDataInputStream 是 inputStream 的装饰流,可以通过普通流方式操纵 fis
            FSDataInputStream fis = fs.open(p);
    
            FileOutputStream fos = new FileOutputStream("D:/1.txt");
    
            // 通过 IOUtils 拷贝文件
            IOUtils.copyBytes(fis, fos, 1024);
    
            fis.close();
            fos.close();
            System.out.println("ok");
    
        }
    
        // 3. 测试写文件,将本地文件写入到 HDFS 中
        @Test
        public void testwrite() throws IOException {
    
            // 设置系统用户名
            System.setProperty("HADOOP_USER_NAME", "centos");
    
            // 初始化配置文件
            Configuration conf = new Configuration();
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);
    
            // 获得输入流
            FileInputStream fis = new FileInputStream("E:/p_data/test/customers.txt");
    
            // 初始化路径
            Path pout = new Path("/b.txt");
    
            // 通过文件系统获取输出流
            // FSDataOutputStream 是 outputStream 的装饰流,可以通过普通流方式操纵 fos
            FSDataOutputStream fos = fs.create(pout);
    
            // 通过 IOUtils 拷贝文件
            IOUtils.copyBytes(fis, fos, 1024);
    
            fis.close();
            fos.close();
            System.out.println("ok");
    
        }
    
        // 创建文件夹
        @Test
        public void testMkdir() throws IOException {
    
            // 设置系统用户名
            System.setProperty("HADOOP_USER_NAME", "centos");
    
            // 初始化配置文件
            Configuration conf = new Configuration();
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);
    
            boolean b = fs.mkdirs(new Path("/aaa"));
    
            System.out.println(b);
    
    
        }
    
        // 删除文件夹
        @Test
        public void testDelete() throws IOException {
    
            // 设置系统用户名
            System.setProperty("HADOOP_USER_NAME", "centos");
    
            // 初始化配置文件
            Configuration conf = new Configuration();
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);
    
            boolean b = fs.delete(new Path("/aaa"),true);
    
            System.out.println(b);
        }
    
        // 文件末尾追加文件
        @Test
        public void testAppend() throws IOException {
    
            // 设置系统用户名
            System.setProperty("HADOOP_USER_NAME", "centos");
    
            // 初始化配置文件
            Configuration conf = new Configuration();
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);
    
            // 通过文件系统获取输出流
            // FSDataOutputStream 是 outputStream 的装饰流,可以通过普通流方式操纵 fos
            FSDataOutputStream fos = fs.append(new Path("/a.txt"));
    
            // 通过文件系统获取输入流
            // FSDataInputStream 是 inputStream 的装饰流,可以通过普通流方式操纵 fis
            FileInputStream fis = new FileInputStream("E:/p_data/add.txt");
    
            // 通过 IOUtils 拷贝文件
            IOUtils.copyBytes(fis,fos,1024);
    
            fis.close();
            fos.close();
        }
    
        // 通过递归列出指定文件夹下的文件或文件夹信息
        public static void testList(String path) {
            try {
                // 初始化配置文件
                Configuration conf = new Configuration();
                // 初始化文件系统
                FileSystem fs = FileSystem.get(conf);
    
                FileStatus[] statuses = fs.listStatus(new Path(path));
    
                for (FileStatus status : statuses) {
                    if (status.isDirectory()) {
                        path = status.getPath().toString();
                        System.out.println(path);
                        testList(path);
                    } else {
                        System.out.println(status.getPath().toString());
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            testList("/");
        }
    
    }

  • 相关阅读:
    Linux 命令大全
    MySQL 存储 utf8mb4
    PHP房贷计算器代码,等额本息,等额本金
    laravel 原生 sql
    include_once 问题
    laravel count distinct
    微信小程序显示cms里的html文章
    PHP文件上传
    Ajax做无刷新分页
    PHP封装返回Ajax字符串和JSON数组
  • 原文地址:https://www.cnblogs.com/share23/p/9779418.html
Copyright © 2011-2022 走看看