zoukankan      html  css  js  c++  java
  • java操作HDFS

    package study.bigdata.hdfs;
    
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.junit.Test;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * Created by lq on 2017/7/5.
     */
    public class HDFSUtils {
        //mmini1 is cluster
        //public static String hdfsURI="hdfs://mini1:9000";
        //public static String localURI="D:/test/hadoop-2.6.5/hadoopData";
        //public static String uri="D:/test/hadoop-2.6.5";
        //public static String URI="D:/test/hadoop-2.6.5/hadoopData";
        public static String URI="hdfs://mini1:9000";
    
        /**
         * 获取配置
         * @return
         */
        public static Configuration getConf(){
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS",URI);
        //conf.set("hadoop.home.dir","/home/hadoop/apps/hadoop-2.6.5/tmp");
        //conf.set("hadoop.home.dir","D:/test/hadoop-2.6.5");
        System.setProperty("HADOOP_USER_NAME","hadoop");
        return conf;
        }
    
        /**
         * 创建文件夹
         * @param dir 文件夹全路径
         * @return
         * @throws IOException
         */
        public static boolean mkdir(String dir) throws IOException {
            if(StringUtils.isBlank(dir)){
                return false;
            }
            FileSystem fs = FileSystem.get(getConf());
            if(!fs.exists(new Path(dir))){
               return  fs.mkdirs(new Path(dir));
            }
            fs.close();
            return false;
        }
    
        /**
         * 删除文件夹
         * @param dir 文件夹全路径
         * @return
         * @throws IOException
         */
        public static boolean deleteDir(String dir) throws IOException {
            if (StringUtils.isBlank(dir)) return false;
    
            FileSystem fs = FileSystem.get(getConf());
            boolean res = fs.delete(new Path(dir),true);
            fs.close();
            return res;
        }
    
        /**
         * 查询指定路径下的文件夹或者文件
         * @param dir 指定全路径
         * @return
         * @throws IOException
         */
        public static List<String> listAll(String dir) throws IOException {
            List<String> names = new ArrayList<String>();
            if(StringUtils.isBlank(dir)){
                return names;
            }
            FileSystem fs = FileSystem.get(getConf());
            FileStatus[] statuses = fs.listStatus(new Path(dir));
            for(int i=0;i<statuses.length;i++){
                if(statuses[i].isFile()){
                    //regular file
                    names.add(statuses[i].getPath().toString());
                } else if(statuses[i].isDirectory()){
                    //dir
                    names.add(statuses[i].getPath().toString());
                }else if(statuses[i].isSymlink()){
                    //is a sysm link in linux
                    names.add(statuses[i].getPath().toString());
                }
            }
            System.out.println(names);
            fs.close();
            return names;
        }
    
        /**
         * 从本地复制到hdfs上
         * @param localFile 本地文件路径
         * @param hdfsFile hdfs文件路径
         * @return
         * @throws IOException
         */
        public static boolean copyFromLocalFile(String localFile,String hdfsFile) throws IOException {
            if(StringUtils.isBlank(localFile) || StringUtils.isBlank(hdfsFile)){
                return false;
            }
            FileSystem hdfs = FileSystem.get(getConf());
            Path src = new Path(localFile);
            Path dst = new Path(hdfsFile);
            hdfs.copyFromLocalFile(src,dst);
            hdfs.close();
            return true;
        }
    
        /**
         * 在hdfs上创建一个文件
         * @param newFile 新文件
         * @param content 内容
         * @return
         * @throws IOException
         */
        public static boolean create(String newFile,String content) throws IOException {
            //if(StringUtils.isBlank(newFile) || StringUtils.isBlank(content)){
            if(StringUtils.isBlank(newFile) || content==null){
                return false;
            }
            FileSystem hdfs = FileSystem.get(getConf());
            FSDataOutputStream os = hdfs.create(new Path(newFile));
            os.write(content.getBytes("UTF-8"));
            os.close();
            hdfs.close();
            return true;
        }
    
        /**
         * 删除hdfs 上的文件
         * @param hdfsFile
         * @param recursive true 递归删除 false,只删除当前的
         * @return
         * @throws IOException
         */
        public static boolean delete(String hdfsFile,boolean recursive) throws IOException {
            if(StringUtils.isBlank(hdfsFile)) {
                return false;
            }
            FileSystem hdfs = FileSystem.get(getConf());
            //recursive
            boolean res = hdfs.delete(new Path(hdfsFile),recursive);
            hdfs.close();
            return res;
        }
    
        /**
         * 读取hdfs文件
         * @param hdfsFile
         * @return
         * @throws Exception
         */
        public static byte[] readHDFS(String hdfsFile) throws Exception {
            if(StringUtils.isBlank(hdfsFile)){
                return null;
            }
            FileSystem fs = FileSystem.get(getConf());
            // check if the file exists
            Path path = new Path(hdfsFile);
            if (fs.exists(path)) {
                FSDataInputStream is = fs.open(path);
                // get the file info to create the buffer
                FileStatus stat = fs.getFileStatus(path);
                // create the buffer
                byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
                is.readFully(0, buffer);
                is.close();
                fs.close();
                return buffer;
            } else {
                throw new Exception("the file is not found .");
            }
        }
    
        /**
         * 向指定文件追加
         * @param hdfsFile
         * @param content
         * @return
         * @throws Exception
         */
        public static  boolean append(String hdfsFile,String content) throws Exception {
            if(StringUtils.isBlank(hdfsFile)) return false;
            if(content==null) return false;
           // solve the problem when appending at single datanode hadoop env
           // conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
            //conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
            FileSystem fs = FileSystem.get(getConf());
            // check if the file exists
            Path path = new Path(hdfsFile);
            if (fs.exists(path)) {
                try {
                    InputStream in = new ByteArrayInputStream(content.getBytes());
                    OutputStream out = fs.append(new Path(hdfsFile));
                    IOUtils.copyBytes(in, out, 4096, true);
                    out.close();
                    in.close();
                    fs.close();
                    return true;
                } catch (Exception ex) {
                    fs.close();
                    throw ex;
                }
            } else {
                return    HDFSUtils.create(hdfsFile, content);
            }
        }
    
        @Test
        public void testappend() throws Exception {
            append(URI+"/study/input/qq.dat","hello kitty hello kitty");
        }
    
        @Test
        public void testreadHDFS() throws Exception {
            System.out.println(new String(readHDFS(URI+"/study/input/qq.dat")));
        }
        @Test
        public void testdelete() throws IOException {
            delete(URI+"/study/inputx",true);
        }
        @Test
        public void testcreate() throws IOException {
            create(URI+"/input/word2.txt","");
        }
    
    
        @Test
        public void testcopyFromLocalFile() throws IOException {
            copyFromLocalFile("D:\test\hadoop-2.6.5\hadoopData\word.txt",URI+"/inputx");
        }
    
        @Test
        public void testlistAll() throws IOException {
            listAll(URI+"/");
        }
        @Test
        public void testdeldir() throws IOException {
            deleteDir(URI+"/input/test2");
        }
    
        @Test
        public void testmkdir() throws IOException {
            mkdir(URI+"/input");
        }
    
    }

    http://blog.csdn.net/daxiang12092205/article/details/52717470

  • 相关阅读:
    TCP/IP协议详解
    linux高性能服务器编程--初见
    聚合类
    类class 2
    继承
    构造函数再探
    静态成员与友元
    MySQL图形工具SQLyog破解版
    MySQL注释符号
    数据库中多对多关系的中间表的命名规则
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7121594.html
Copyright © 2011-2022 走看看