zoukankan      html  css  js  c++  java
  • 使用HDFS客户端java api读取hadoop集群上的信息

    本文介绍使用hdfs java api的配置方法。

    1、先解决依赖,pom

    <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.2</version>
                <scope>provided</scope>
            </dependency>

    2、配置文件,存放hdfs集群配置信息,基本都是来源于core-site.xml和hdfs-site.xml,可以根据hdfs集群client端配置文件里的信息进行填写

    #============== hadoop ===================
    hdfs.fs.defaultFS=hdfs://mycluster-tj
    hdfs.ha.zookeeper.quorum=XXXX-apache00.XX01,XXXX-apache01.XX01,XXXX-apache02.XX01
    hdfs.dfs.nameservices=XXXX
    hdfs.dfs.ha.namenodes.mycluster-tj=XX1,XX2
    hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1=XXXX-apachenn01.XX01:8020
    hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2=XXXX-apachenn02.XX01:8020

    3、java client api

    import java.io.IOException;
    import java.net.URI;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.Configuration;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    
    public class HadoopClient {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
        private FileSystem fs;
        private String defaultFS;
        private String zKQuorum;
        private String nameServices;
        private String nameNodes;
        private String rpcAddressNN1;
        private String rpcAddressNN2;
    
        public void setDefaultFS(String defaultFS) {
            this.defaultFS = defaultFS;
        }
        public String getDefaultFS() {
            return defaultFS;
        }
        public void setZKQuorum(String zKQuorum) {
            this.zKQuorum = zKQuorum;
        }
        public String getzKQuorum() {
            return zKQuorum;
        }
        public void setNameServices(String nameServices) {
            this.nameServices = nameServices;
        }
        public String getNameServices() {
            return nameServices;
        }
        public void setNameNodes(String nameNodes) {
            this.nameNodes = nameNodes;
        }
        public String getNameNodes() {
            return nameNodes;
        }
        public void setRpcAddressNN1(String rpcAddressNN1) {
            this.rpcAddressNN1 = rpcAddressNN1;
        }
        public String getRpcAddressNN1() {
            return rpcAddressNN1;
        }
        public void setRpcAddressNN2(String rpcAddressNN2) {
            this.rpcAddressNN2 = rpcAddressNN2;
        }
        public String getRpcAddressNN2() {
            return rpcAddressNN2;
        }
    
        public void init() {
            try {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", defaultFS);
                conf.set("ha.zookeeper.quorum", zKQuorum);
                conf.set("dfs.nameservice", nameServices);
                conf.set("dfs.ha.namenodes.mycluster-tj", nameNodes);
                conf.set("dfs.namenode.rpc-address.mycluster-tj.nn1", rpcAddressNN1);
                conf.set("dfs.namenode.rpc-address.mycluster-tj.nn2", rpcAddressNN2);
                fs = FileSystem.get(new URI(defaultFS), conf);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public void stop() {
            try {
                fs.close();
            } catch(Exception e) {
    
            }
        }
    
        public boolean exists(String path) {
            boolean isExists = false;
            try {
                Path hdfsPath = new Path(path);
                isExists = fs.exists(hdfsPath);
            } catch (Exception ex) {
                logger.error("exists error: {}", ex.getMessage());
            }
            return isExists;
        }
    
        public String getModificationTime(String path) throws IOException {
            String modifyTime = null;
            try {
                Path hdfsPath = new Path(path);
                FileStatus fileStatus = fs.getFileStatus(hdfsPath);
                long modifyTimestamp = fileStatus.getModificationTime();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                Date date = new Date(modifyTimestamp);
                modifyTime = simpleDateFormat.format(date);
            } catch(Exception ex) {
                logger.error("getModificationTime error: {}", ex.getMessage());
            }
            return modifyTime;
        }
    
    
    }

    4、configuration

    import com.xiaoju.dqa.prometheus.client.hadoop.HadoopClient;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class HadoopConfiguration {
        @Value("${hdfs.fs.defaultFS}")
        private String defaultFS;
        @Value("${hdfs.ha.zookeeper.quorum}")
        private String zKQuorum;
        @Value("${hdfs.dfs.nameservices}")
        private String nameServices;
        @Value("${hdfs.dfs.ha.namenodes.mycluster-tj}")
        private String nameNodes;
        @Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1}")
        private String rpcAddressNN1;
        @Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2}")
        private String rpcAddressNN2;
    
        @Bean(initMethod = "init", destroyMethod = "stop")
        public HadoopClient hadoopClient() {
            HadoopClient hadoopClient = new HadoopClient();
            hadoopClient.setDefaultFS(defaultFS);
            hadoopClient.setZKQuorum(zKQuorum);
            hadoopClient.setNameServices(nameServices);
            hadoopClient.setNameNodes(nameNodes);
            hadoopClient.setRpcAddressNN1(rpcAddressNN1);
            hadoopClient.setRpcAddressNN2(rpcAddressNN2);
            return hadoopClient;
        }
    }

    今天被一个问题坑的要死了,回来补这篇文章。

    如果你要访问的集群采用了viewfs方式管理数据,按照本文上面的方法链接集群是有问题。会导致由URI和nameservices解析成功的namenode才可以访问,而其他的访问不了!!!

    如果你想解决这个问题,在api部分你要去掉URI部分和nameservices配置,直接使用集群客户端hdfs-site.xml和core-site.xml

    应该是这样的。

    package com.xiaoju.dqa.jazz.hadoop.client;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    
    public class HadoopClient {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
        private FileSystem fs;
    
        public void init() {
            try {
                Configuration conf = new Configuration();
                conf.addResource("core-site.xml");
                conf.addResource("hdfs-site.xml");
                conf.addResource("mount-table.xml");
                fs = FileSystem.get(conf);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public void stop() {
            try {
                fs.close();
            } catch(Exception e) {
    
            }
        }
    
        public boolean exists(String path) {
            boolean isExists = true;
            try {
                Path hdfsPath = new Path(path);
                isExists = fs.exists(hdfsPath);
            } catch (Exception e) {
                logger.error("[HDFS]判断文件是否存在失败", e);
            }
            return isExists;
        }
    
        public String getModificationTime(String path) throws IOException {
            String modifyTime = null;
            try {
                Path hdfsPath = new Path(path);
                FileStatus fileStatus = fs.getFileStatus(hdfsPath);
                long modifyTimestamp = fileStatus.getModificationTime();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                Date date = new Date(modifyTimestamp);
                modifyTime = simpleDateFormat.format(date);
            } catch(Exception e) {
                logger.error("[HDFS]获取最近修改时间失败", e);
            }
            return modifyTime;
        }
    
        public long getPathSize(String path) throws IOException {
            long size = -1L;
            try {
                Path hdfsPath = new Path(path);
                size = fs.getContentSummary(hdfsPath).getLength();
            } catch (Exception e) {
                logger.error("[HDFS]获取路径大小失败", e);
            }
            return size;
    
        }
    
    }

    config中也不需要传任何参数了

    package com.xiaoju.dqa.jazz.hadoop.configuration;
    
    import com.xiaoju.dqa.jazz.hadoop.client.HadoopClient;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class HadoopConfig {
    
        @Bean(initMethod = "init", destroyMethod = "stop")
        public HadoopClient hadoopClient() {
            HadoopClient hadoopClient = new HadoopClient();
            return hadoopClient;
        }
    }
  • 相关阅读:
    java基础的判断循环
    idea反编译
    JavaWeb
    JavaWeb
    JavaWeb
    JavaWeb
    pycharm使用virtualenv环境
    django常用操作
    Linux下Tomcat的安装以及项目部署
    Linux下JDK的安装
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7221527.html
Copyright © 2011-2022 走看看