zoukankan      html  css  js  c++  java
  • FTP文件上传到HDFS上

    在做测试数据时,往往会有ftp数据上传到hdfs的需求,一般需要手动操作,这样做太费事,于是有了下边代码实现的方式:

    ftp数据上传到hdfs函数:

    import java.io.InputStream;
    
    import org.apache.commons.net.ftp.FTP;
    import org.apache.commons.net.ftp.FTPClient;
    import org.apache.commons.net.ftp.FTPFile;
    import org.apache.commons.net.ftp.FTPReply;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    /**
     * Created by Administrator on 11/10/2017.
     */
    public class FtpUtil {
        /**
         * loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
         *
         * @param ip
         * @param username
         * @param password
         * @param filePath
         * @param outputPath
         * @param conf
         * @return
         * @author qiyongkang
         * @since JDK 1.8
         */
        public static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath, String outputPath, Configuration conf) {
            FTPClient ftp = new FTPClient();
            InputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            boolean flag = true;
            try {
                ftp.connect(ip);
                ftp.login(username, password);
                ftp.setFileType(FTP.BINARY_FILE_TYPE);
                ftp.setControlEncoding("UTF-8");
                int reply = ftp.getReplyCode();
                if (!FTPReply.isPositiveCompletion(reply)) {
                    ftp.disconnect();
                }
                FTPFile[] files = ftp.listFiles(filePath);
                FileSystem hdfs = FileSystem.get(conf);
                for (FTPFile file : files) {
                    if (!(file.getName().equals(".") || file.getName().equals(".."))) {
                        inputStream = ftp.retrieveFileStream(filePath + file.getName());
                        outputStream = hdfs.create(new Path(outputPath + file.getName()));
                        IOUtils.copyBytes(inputStream, outputStream, conf, false);
                        if (inputStream != null) {
                            inputStream.close();
                            ftp.completePendingCommand();
                        }
                    }
                }
                ftp.disconnect();
            } catch (Exception e) {
                flag = false;
                e.printStackTrace();
            }
            return flag;
        }
    }

    main调用函数:

    import org.apache.hadoop.conf.Configuration
    
    /**
      * Created by Administrator on 11/10/2017.
      */
    object FtpDownToHdfsMain {
      def main(args: Array[String]): Unit = {
        val conf = new Configuration()
        FtpUtil.loadFromFtpToHdfs("192.168.1.23", "test", "abc123", "/www/input/", "/user/jr/dt/fblib/", conf)
      }
    }

    使用yarn jar提交:

    yarn jar myapp.jar
  • 相关阅读:
    try catch finally return
    github结合TortoiseGit使用sshkey,无需输入账号和密码
    github上fork别人的代码之后,如何保持和原作者同步的更新
    第9章 浅度和深度复制
    9.7结构类型
    excel在msdn上的说明文档
    9.6接口和抽象类
    [LeetCode]N-Queens II
    鸟哥Linux私房菜知识汇总8至9章
    Memcahce(MC)系列(三)Memcached它PHP转让
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7650409.html
Copyright © 2011-2022 走看看