zoukankan      html  css  js  c++  java
  • Java实现HadoopHA集群的hdfs控制

    一、HadoopHA的搭建:https://www.cnblogs.com/null-/p/10000309.html

     

    二、pom文件依赖:

    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.2</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/junit/junit -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
        </dependencies>

    三、控制代码:

    package com.hdfs.demo;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * @author 王传礼
     */
    public class HdfsDemo {
    
        /**
         * 根据配置获取HDFS文件操作系统
         *
         * @return FileSystem
         */
        public static FileSystem getHadoopFileSystem() {
            FileSystem fs = null;
            Configuration conf = null;
            //方法:本地没有hadoop系统,但可以远程访问。根据给定的URI和用户名,访问hdfs的配置参数
            conf = new Configuration();
            //Hadoop的用户名
            String hdfsUserNmae = "root";
            URI hdfsUri = null;
            try {
                hdfsUri = new URI("hdfs://192.168.182.135:8020");
                // HDFS的访问路径
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            try {
                //根据远程的NN节点,获取配置信息,创建HDFS对象
                fs = FileSystem.get(hdfsUri, conf, hdfsUserNmae);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return fs;
        }
    
        /**
         * 这里的创建文件夹同shell中的mkdir -p 语序前面的文件夹不存在
         * 跟java中的IO操作一样,也只能对path对象做操作;但是这里的Path对象是hdfs中的
         *
         * @param fs,filepath
         * @return
         */
        public static boolean myCreatePath(FileSystem fs,String filepath) {
            boolean b = false;
            Path path = new Path(filepath);
            try {
                // even the path exist,it can also create the path.
                b = fs.mkdirs(path);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return b;
        }
    
        /**
         * 删除文件,实际上删除的是给定path路径的最后一个
         * 跟java中一样,也需要path对象,不过是hadoop.fs包中的。
         * 实际上delete(Path p)已经过时了,更多使用delete(Path p,boolean recursive)
         * 后面的布尔值实际上是对文件的删除,相当于rm -r
         *
         * @param fs
         * @return
         */
        public static boolean myDropHdfsPath(FileSystem fs, String filepath) {
            boolean b = false;
            // drop the last path
            Path path = new Path(filepath);
            try {
                b = fs.delete(path, true);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return b;
        }
    
    
        /**
         * 重命名文件夹
         *
         * @param hdfs
         * @return
         */
        public static boolean myRename(FileSystem hdfs, String oldname, String newname) {
            boolean b = false;
            Path oldPath = new Path(oldname);
            Path newPath = new Path(newname);
    
            try {
                b = hdfs.rename(oldPath, newPath);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    hdfs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return b;
        }
    
        /**
         * 遍历文件夹
         * public FileStatus[] listStatus(Path p)
         * 通常使用HDFS文件系统的listStatus(path)来获取改定路径的子路径。然后逐个判断
         * 值得注意的是:
         * 1.并不是总有文件夹中有文件,有些文件夹是空的,如果仅仅做是否为文件的判断会有问题,必须加文件的长度是否为0的判断
         * 2.使用getPath()方法获取的是FileStatus对象是带URL路径的。使用FileStatus.getPath().toUri().getPath()获取的路径才是不带url的路径
         *
         * @param hdfs
         * @param listPath 传入的HDFS开始遍历的路径
         * @return
         */
        public static Set<String> recursiveHdfsPath(FileSystem hdfs, Path listPath) {
    
                    /*FileStatus[] files = null;
                    try {
                        files = hdfs.listStatus(listPath);
                        Path[] paths = FileUtil.stat2Paths(files);
                        for(int i=0;i<files.length;i++){
                            if(files[i].isFile()){
                                // set.add(paths[i].toString());
                                set.add(paths[i].getName());
                            }else {
                                recursiveHdfsPath(hdfs,paths[i]);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        logger.error(e);
                    }*/
    
            FileStatus[] files = null;
            Set<String> set = null;
            try {
                files = hdfs.listStatus(listPath);
                // 实际上并不是每个文件夹都会有文件的。
                if (files.length == 0) {
                    // 如果不使用toUri(),获取的路径带URL。
                    set.add(listPath.toUri().getPath());
                } else {
                    // 判断是否为文件
                    for (FileStatus f : files) {
                        if (files.length == 0 || f.isFile()) {
                            set.add(f.getPath().toUri().getPath());
                        } else {
                            // 是文件夹,且非空,就继续遍历
                            recursiveHdfsPath(hdfs, f.getPath());
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return set;
        }
    
        /**
         * 文件简单的判断
         * 是否存在
         * 是否是文件夹
         * 是否是文件
         *
         * @param fs
         */
        public static void myCheck(FileSystem fs, String filepath) {
            boolean isExists = false;
            boolean isDirectorys = false;
            boolean isFiles = false;
    
            Path path = new Path(filepath);
    
            try {
                isExists = fs.exists(path);
                isDirectorys = fs.isDirectory(path);
                isFiles = fs.isFile(path);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            if (!isExists) {
                System.out.println("The path is not exist.");
            } else {
                System.out.println("The path is exist.");
                if (isDirectorys) {
                    System.out.println("This is a Directory");
                } else if (isFiles) {
                    System.out.println("This is Files");
                }
            }
        }
    
        /**
         * 获取配置的所有信息
         * 首先,我们要知道配置文件是哪一个
         * 然后我们将获取的配置文件用迭代器接收
         * 实际上配置中是KV对,我们可以通过java中的Entry来接收
         */
        public static void showAllConf() {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://node1:8020");
            Iterator<Map.Entry<String, String>> it = conf.iterator();
            while (it.hasNext()) {
                Map.Entry<String, String> entry = it.next();
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }
        }
    
        /**
         * 文件下载
         * 注意下载的路径的最后一个地址是下载的文件名
         * copyToLocalFile(Path local,Path hdfs)
         * 下载命令中的参数是没有任何布尔值的,如果添加了布尔是,意味着这是moveToLocalFile()
         *文件下载有权限要求 要有写的权限
         * @param fs
         */
        public static void getFileFromHDFS(FileSystem fs, String dfsFile, String locPath) {
            Path HDFSPath = new Path(dfsFile);
            Path localPath = new Path(locPath);
            try {
                fs.copyToLocalFile(HDFSPath, localPath);
                System.out.println("File download.");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 文件的上传
         * 注意事项同文件的上传
         * 注意如果上传的路径不存在会自动创建
         * 如果存在同名的文件,会覆盖
         *
         * @param fs
         */
        public static void myPutFile2HDFS(FileSystem fs, String localFile, String dfsPath) {
    
            boolean pathExists = false;
            // 如果上传的路径不存在会创建
            // 如果该路径文件已存在,就会覆盖
            Path localPath = new Path(localFile);
            Path hdfsPath = new Path(dfsPath);
    
            try {
                fs.copyFromLocalFile(localPath, hdfsPath);
                System.out.println("File upload.");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        /**
         * hdfs之间文件的复制
         * 使用FSDataInputStream来打开文件open(Path p)
         * 使用FSDataOutputStream开创建写到的路径create(Path p)
         * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)来进行具体的读写
         * 说明:
         * 1.java中使用缓冲区来加速读取文件,这里也使用了缓冲区,但是只要指定缓冲区大小即可,不必单独设置一个新的数组来接受
         * 2.最后一个布尔值表示是否使用完后关闭读写流。通常是false,如果不手动关会报错的
         *
         * @param hdfs
         */
        public static void copyFileBetweenHDFS(FileSystem hdfs, String in, String out) {
            Path inPath = new Path(in);
            Path outPath = new Path(out);
    
            // byte[] ioBuffer = new byte[1024*1024*64];
            // int len = 0;
    
            FSDataInputStream hdfsIn = null;
            FSDataOutputStream hdfsOut = null;
    
            try {
                hdfsIn = hdfs.open(inPath);
                hdfsOut = hdfs.create(outPath);
    
                IOUtils.copyBytes(hdfsIn, hdfsOut, 1024 * 1024 * 64, false);
    
            } catch (IOException e) {
                e.printStackTrace();
    
           } finally {
                try {
                    hdfsOut.close();
                    hdfsIn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
            }
    
        }
    }

    四、测试代码

    package com.hdfs.demo;
    
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.junit.Test;
    
    import java.util.Scanner;
    import java.util.Set;
    
    import static org.junit.Assert.*;
    
    public class HdfsTest {
    
        Scanner sc = new Scanner(System.in);
        FileSystem fs = HdfsDemo.getHadoopFileSystem();
    
        @Test
        public void myCreatePath() {
            //目录创建测试
            String path = "/usr/test/input";
            System.out.println(HdfsDemo.myCreatePath(fs,path));
        }
    
        @Test
        public void myDropHdfsPath() {
            // 目录删除
            String path = "/usr/test/output";
            System.out.println(HdfsDemo.myDropHdfsPath(fs,path));
        }
    
        @Test
        public void myRename() {
            //文件重命名
            String oldName = "/usr/test/input";
            String newName = "/usr/test/renameInput";
            System.out.println(HdfsDemo.myRename(fs,oldName,newName));
        }
    
        @Test
        public void recursiveHdfsPath() {
            //遍历文件夹
            Path path = new Path("/usr/test/");
            Set<String> set = HdfsDemo.recursiveHdfsPath(fs, path);
            for (String str :
                    set) {
                System.out.println(str);
            }
        }
    
        @Test
        public void myCheck() {
            //文件简单的判断 是否存在 是否是文件夹 是否是文件
            String path = "/usr/test/input/file.txt";
            HdfsDemo.myCheck(fs,path);
        }
    
        @Test
        public void showAllConf() {
            //获取配置的所有信息
            HdfsDemo.showAllConf();
        }
    
        @Test
        public void getFileFromHDFS() {
            //文件下载 文件下载有权限要求 要有写的权限 0644error
            String dfsFile = "/usr/test/input/file.txt";
            String locPath = "e://temp/data/";
            HdfsDemo.getFileFromHDFS(fs,dfsFile,locPath);
        }
    
        @Test
        public void myPutFile2HDFS() {
            //文件的上传
            String localFile = "e://temp/file.txt";
            String dfsPath = "/usr/test/input";
            HdfsDemo.myPutFile2HDFS(fs,localFile,dfsPath);
        }
    
        @Test
        public void copyFileBetweenHDFS() {
            //hdfs之间文件的复制
            String in = "/usr/test/output";
            String out = "/usr/test/input";
            HdfsDemo.copyFileBetweenHDFS(fs,in,out);
        }
    }
  • 相关阅读:
    CSS的应用
    关于新手html的认识 以及对table的基本用法
    javascript的使用方法
    CSS的使用方式和选择器的用法
    html基础知识点
    前端课堂第四课
    前端课堂第三课
    前端实训第二课
    前端实训随笔
    JS02
  • 原文地址:https://www.cnblogs.com/null-/p/10073003.html
Copyright © 2011-2022 走看看