zoukankan      html  css  js  c++  java
  • 通过HA方式操作HDFS

    之前操作hdfs的时候,都是固定namenode的地址,然后去操作。这个时候就必须判断namenode的状态为active还是standby,比较繁琐,如果集群使用了HA的形式,就很方便了

    直接上代码,看注释:

    package com.ideal.template.openbigdata.util;
    
    import java.io.IOException;
    import java.net.URI;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Timestamp;
    import java.text.SimpleDateFormat;
    
    import java.util.LinkedList;
    import java.util.List;
    
    //import org.anarres.lzo.LzoAlgorithm;
    //import org.anarres.lzo.LzoDecompressor;
    //import org.anarres.lzo.LzoInputStream;
    //import org.anarres.lzo.LzoLibrary;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.log4j.Logger;
    
    public class HadoopUse
    {
        private static Log log = LogFactory.getLog(HadoopUse.class);
        
        /**
    	 * 设置hdfs配置信息
    	 * @return
    	 */
    	private static Configuration getConf()
    	{
    		Configuration conf = new Configuration();
    		
    		//设置配置相关的信息,分别对应hdfs-site.xml core-site.xml
    		conf.set("fs.defaultFS", "hdfs://dragoncluster");
    		conf.set("dfs.nameservices", "dragoncluster");
    		conf.set("dfs.ha.namenodes.dragoncluster", "nn1,nn2");
    		conf.set("dfs.namenode.rpc-address.dragoncluster.nn1", "n01.dragon.com:8020");
    		conf.set("dfs.namenode.rpc-address.dragoncluster.nn2", "n02.dragon.com:8020");
    		conf.set("dfs.client.failover.proxy.provider.dragoncluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    		
    		//设置实现类,因为会出现类覆盖的问题
    		conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    		conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
    		return conf;
    	}
    	
    	/**
    	 * 设置kerberos认证
    	 * @param conf
    	 * @throws Exception
    	 */
    	private static void kerberosLogin(Configuration conf) throws Exception
    	{
    		conf.set("hadoop.security.authentication", "kerberos");
    		UserGroupInformation.setConfiguration(conf);
    		UserGroupInformation.loginUserFromKeytab("openbigdata@DRAGON.COM", "/etc/security/keytabs/openbigdata.keytab");
    	}
    	
    	public static long getSize(String uri, String user)
    	{
    		Path path = new Path(URI.create(uri));
    
    		Configuration conf = new Configuration();
    		try
    		{
    			FileSystem fs = FileSystem.get(URI.create(uri), conf, user);
    			return fs.getContentSummary(path).getLength() / 1024 / 1024; // 单位为MB
    		}
    		catch (Exception ex)
    		{
    			log.error("HadoopUse.getSize" + ex.getMessage(), ex);
    			return 0;
    		}
    	}
    
    	/**
    	 * 在hdfs上创建文件,并写入内容
    	 * 
    	 * @param uri
    	 * @param content
    	 * @param user
    	 * @return
    	 */
    	public static boolean createHdfsFile(String uri, String user, String fullName, String content)
    	{
    		if (fullName == null || fullName.length() == 0)
    		{// 本地路径不正确
    			return false;
    		}
    		if (content == null || content.length() == 0)
    		{// hdfs路径不正确
    			return false;
    		}
    
    		try
    		{
    			Configuration conf = new Configuration();
    
    			FileSystem fs = FileSystem.get(URI.create(uri), conf, user);
    			FSDataOutputStream os = null;
    
    			if (fs.exists(new Path(fullName)) == true)
    			{// 如果该路径存在
    				// os = fs.append(new Path(fullName));
    				fs.delete(new Path(fullName), true);
    			}
    			os = fs.create(new Path(fullName));
    			os.write(content.getBytes());
    			os.close();
    			fs.close();
    			return true;
    		}
    		catch (Exception ex)
    		{
    			log.error("HadoopUse.createHdfsFile" + ex.getMessage(), ex);
    			return false;
    		}
    	}
    	
    	/**
    	 * 删除hdfs上的文件
    	 * @param uri
    	 * @param user
    	 * @param fullName
    	 * @return
    	 */
    	public static boolean deleteHdfsFile(String uri, String user, String fullName)
    	{
    		if (fullName == null || fullName.length() == 0)
    		{// 本地路径不正确
    			log.error("HadoopUse.deleteHdfsFile文件名不合法");
    			return false;
    		}
    
    		try
    		{
    			Configuration conf = new Configuration();
    
    			FileSystem fs = FileSystem.get(URI.create(uri), conf, user);
    			//FSDataOutputStream os = null;
    
    			if (fs.exists(new Path(fullName)) == true)
    			{// 如果该路径存在
    				// os = fs.append(new Path(fullName));
    				fs.delete(new Path(fullName), true);
    			}
    			return true;
    		}
    		catch (Exception ex)
    		{
    			log.error("HadoopUse.createHdfsFile" + ex.getMessage(), ex);
    		}
    		return false;
    	}
    	
    	/**
    	 * 根据resultset将值写入到hdfs上
    	 * @param uri
    	 * @param user
    	 * @param fullName
    	 * @param resultSet
    	 * @param terminated
    	 * @return
    	 * @throws InterruptedException 
    	 * @throws IOException 
    	 * @throws SQLException 
    	 */
        public void createHdfsFile(String fullName, ResultSet resultSet, String terminated, FlagUtil flag)
            throws IOException, InterruptedException, SQLException, Exception
        {
            if (resultSet == null)
            { // 如果查询出来的游标为空,直接退出
                return;
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            
            FileSystem fs = null;
            FSDataOutputStream out = null;
            Configuration conf = getConf();
            kerberosLogin(conf);
            
            fs = FileSystem.get(conf);
            if (fs.exists(new Path(fullName)) == true)
            {// 如果该路径存在
                fs.delete(new Path(fullName), true);
            }
            
            // 获取文件句柄
            out = fs.create(new Path(fullName));
            
            // 写入文件内容
            ResultSetMetaData rsmd = resultSet.getMetaData();
            int rowCnt = rsmd.getColumnCount();
            int count = 0;
            while (resultSet.next())
            {
            	count++;
            	if(count  >= 1000)
            	{//每1000条记录检查一次需要终止任务
            		if(flag.getTeminalStatus() == true)
            		{
            			break;
            		}
            		count = 0;
            	}
            	
                for (int i = 1; i <= rowCnt; i++)
                {
                    if (resultSet.getObject(i) == null)
                    {// 如果是空的数据
                        out.write("".getBytes("utf-8"));
                    }
                    else
                    {
                    	String item = null;
                    	if("DATE".equals(rsmd.getColumnTypeName(i).toUpperCase()))
                    	{//如果是日期类型
                    		Timestamp date = resultSet.getTimestamp(i);
                    		item = sdf.format(date);
                    	}
                    	else
                    	{
                    		item = String.valueOf(resultSet.getObject(i));
                    	}
    					if (item != null)
    					{
    						out.write(item.getBytes("utf-8"));
    					}
    					else
    					{
    						out.write("".getBytes("utf-8"));
    					}
                    }
                    if (i < rowCnt)
                    {// 如果写完一列,则插入分隔符
                        out.write(terminated.getBytes("utf-8"));
                    }
                }
                // 切换到下一行
                out.write("
    ".getBytes("utf-8"));
            }
            log.info("fullName:" + fullName + "写入成功");
            
            if (out != null)
            {
                out.flush();
                out.close();
            }
            if (fs != null)
            {
                fs.close();
            }
        }
        
        /**
    	 * 查询路径
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static List<String> listDir(String path) throws Exception
    	{
    		Configuration conf = getConf();
    		kerberosLogin(conf);
    		FileSystem fs = FileSystem.get(conf);
    		
    		Path hdfs = new Path(path);
    		List<String> pathList = null;
    		FileStatus files[] = fs.listStatus(hdfs);
    		if(files!=null && files.length >0)
    		{
    			pathList = new LinkedList<String>();
    			for (FileStatus file : files)
    			{
    				pathList.add(file.getPath().toString());
    			}
    		}
    		return pathList;
    	}
    
    	public static void main(String[] args) throws Exception
    	{
    		List<String> pathList = listDir(args[0]);
    		for(String path: pathList)
    		{
    			System.out.println(path);
    		}
    	}
    }
    

     注意,这用到了HA,以及kerberos认证,

  • 相关阅读:
    神兽保佑-代码无BUG
    HDU 1022 Train Problem I (数据结构 —— 栈)
    iOS开发
    漫谈程序猿系列:无BUG不生活
    王立平--Unity破解
    java远程调用rmi入门实例
    高仿美团iOS版,版本5.7
    JAVA日志系统
    读《互联网创业password》之随想
    解决iOS空指针数据的问题
  • 原文地址:https://www.cnblogs.com/supertonny/p/7148558.html
Copyright © 2011-2022 走看看