zoukankan      html  css  js  c++  java
  • Java代码操作HDFS

    package com.hy.hdfs;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    public class HDFSCommand {
    
        public static final Logger log = LoggerFactory.getLogger(HDFSCommand.class);
    
    
        public static void main(String[] args) throws Exception {
            String hdfsURI = "hdfs://10.1.23.240:9000";
            String srcPath = "D:" + File.separator + "readme.txt";
            String descPath = "/xhy";
            String data = "haohaohaohaohao
    善字
    善生
    善行
    守善
    愿善";
            Configuration conf = new Configuration();
            copyFromLocalFile(hdfsURI, srcPath, descPath, conf);
            uploadFile(hdfsURI, data, descPath, conf);
            RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = listFile(hdfsURI, descPath, conf, true);
            while (locatedFileStatusRemoteIterator.hasNext()) {
                LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
                System.out.println("listFile:" + next.toString());
            }
            FileStatus[] fileStatuses = listFileAndFolder(hdfsURI, descPath, conf);
            for (FileStatus f : fileStatuses) {
                System.out.println("listFileAndFolder:" + f.toString());
            }
    
    
        }
    
        /**
         * 本地指定路径文件上传到hdfs
         *
         * @param hdfsURI
         * @param srcPath
         * @param descPath
         * @param conf
         */
        public static void copyFromLocalFile(String hdfsURI, String srcPath, String descPath, Configuration conf) throws URISyntaxException, IOException {
            log.info(">> copyFromLocalFile, srcPath is {}, descPath is {}", srcPath, descPath);
            FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
            fs.copyFromLocalFile(new Path(srcPath), new Path(descPath));
            log.info("<< copyFromLocalFile success");
            fs.close();
            /*
             * 底层是通过
             * fs.open(new Path(srcPath), 4096);
             * fs.create(new Path(descPath));
             * IOUtils.copyBytes(in, out, conf, true);
             */
        }
    
    
        /**
         * 将数据写入到hdfs
         *
         * @param hdfsURI
         * @param data
         * @param descPath
         * @param conf
         */
        public static void uploadFile(String hdfsURI, String data, String descPath, Configuration conf) throws Exception {
            log.info(">> uploadFile, descPath is {}, data is {}", descPath, data);
            FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
            /*FSDataOutputStream fsOutputStream = fs.create(new Path(descPath), new Progressable() {
                @Override
                public void progress() {
                    log.info("<< 写入hdfs成功,文件路径为:{}", descPath);
                }
            });*/
            FSDataOutputStream fsOutputStream = fs.create(new Path(descPath),
                    () -> log.info("<< 写入hdfs成功,文件路径为:{}", descPath));
            fsOutputStream.write(data.getBytes(), 0, data.getBytes().length);
            /*
             * 以下几种方式会出现中文乱码
             * fsOutputStream.writeBytes(data);
             * fsOutputStream.writeUTF(data);
             * fsOutputStream.writeChars(data);
             */
            fsOutputStream.close();
            fs.close();
        }
    
    
        /**
         * 查找hdfs指定路径下的文件
         *
         * @param hdfsURI
         * @param path
         * @param conf
         * @param recursive 是否递归查找
         * @throws Exception
         */
        public static RemoteIterator<LocatedFileStatus> listFile(String hdfsURI, String path, Configuration conf, boolean recursive) throws Exception {
            log.info(">> listFile, path is {}, recursive is {}", path, recursive);
            FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
            RemoteIterator<LocatedFileStatus> result = fs.listFiles(new Path(path), recursive);
            log.info("<< listFile, result is {}", result);
            return result;
        }
    
        /**
         * 查找hdfs指定路径下的文件和文件夹
         *
         * @param hdfsURI
         * @param path
         * @param conf
         */
        public static FileStatus[] listFileAndFolder(String hdfsURI, String path, Configuration conf) throws Exception {
            log.info(">> listFileAndFolder, path is {}", path);
            FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
            FileStatus[] result = fs.listStatus(new Path(path));
            log.info("<< listFileAndFolder, result is {}", result.toString());
            return result;
            // 方法二
        }
    
        /**
         * 创建文件夹
         *
         * @param hdfsURI
         * @param path
         * @param conf
         * @throws Exception
         */
        public static void mkDir(String hdfsURI, String path, Configuration conf) throws Exception {
            log.info(">> mkDir, path is {}", path);
            FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
            boolean result = fs.mkdirs(new Path(path));
            if (result) {
                log.info("<< mkDir {} success", path);
            } else {
                log.error("<< mkDir {} error", path);
            }
        }
    
        /**
         * 删除指定路径
         *
         * @param hdfsURI
         * @param path
         * @param conf
         * @throws IOException
         */
        public static void delete(String hdfsURI, String path, Configuration conf) throws IOException {
            log.info(">> delete, path is {}", path);
            conf.set("fs.defaultFS", hdfsURI);
            FileSystem fs = FileSystem.get(conf);
            if (!fs.exists(new Path(path))) {
                log.info("<< delete {} error, path no exists", path);
                return;
            }
            boolean result = fs.delete(new Path(path), true);
            if (result) {
                log.info("<< delete {} success", path);
            } else {
                log.error("<< delete {} error", path);
            }
        }
    
        /**
         * 从hdfs上面下载
         *
         * @param hdfsURI
         * @param srcPath
         * @param descPath
         * @param conf
         * @throws Exception
         */
        public static void downloadFile(String hdfsURI, String srcPath, String descPath, Configuration conf) throws Exception {
            log.info(">> downloadFile, srcPath is {}, descPath is {}", srcPath, descPath);
            FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
            FSDataInputStream in = fs.open(new Path(srcPath));
            OutputStream out = new FileOutputStream(new File(descPath));
            IOUtils.copyBytes(in, out, conf);
        }
    
    
        public static void catFile(String hdfsURI, String path, Configuration conf) throws Exception {
            log.info(">> catFile, path is {}", path);
            FileSystem fs = FileSystem.get(URI.create(hdfsURI), conf);
            FSDataInputStream in = fs.open(new Path(path));
            try {
                IOUtils.copyBytes(in, System.out, 4096, false);
            } finally {
                IOUtils.closeStream(in);
                fs.close();
            }
        }
    
    }
  • 相关阅读:
    python之模块和包
    python之常用模块一
    关于jQuery库的引用
    Python数据挖掘-相关性-相关分析
    Python数据挖掘-使用sklearn包
    Python数据挖掘-关键字提取
    Python数据挖掘-词云美化
    Python数据挖掘-词云
    Python数据挖掘-词频统计-实现
    Python数据挖掘-中文分词
  • 原文地址:https://www.cnblogs.com/xhy-shine/p/10593768.html
Copyright © 2011-2022 走看看