zoukankan      html  css  js  c++  java
  • Hive工具类

    Hive2.x的工具类,对常用方法进行了封装,其中设置了kerberos认证。

    package com.ideal.template.openbigdata.util;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.HashMap;
    import java.util.LinkedHashMap;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.security.UserGroupInformation;
    
    public class HiveOper
    {
    	private static String driverClass = "org.apache.hive.jdbc.HiveDriver";
    
    	/*
    	 * 用户的keytab路径
    	 */
    	private String key;
    
    	/*
    	 * 用户的keytab文件
    	 */
    	private String tab;
    
    	/**
    	 * hive仓库的连接地址
    	 */
    	private String url;
    
    	/**
    	 * hive对应用户
    	 */
    	private String user;
    
    	/**
    	 * hive用户对应的密码
    	 */
    	private String pwd;
    
    	/**
    	 * Hive 连接
    	 */
    	private Connection conn = null;
    
    	public HiveOper(String key, String tab, String url, String user, String pwd)
    	{
    		this.key = key;
    		this.tab = tab;
    		this.url = url;
    		this.user = user;
    		this.pwd = pwd;
    	}
    
    	/**
    	 * 获取hive连接
    	 * 
    	 * @return
    	 */
    	private Connection getConnection()
    	{
    		if (conn == null)
    		{
    			try
    			{
    				/**
    				 * 加入Kerberos认证
    				 */
    				Configuration conf = new Configuration();
    				conf.set("hadoop.security.authentication", "Kerberos");
    
    				UserGroupInformation.setConfiguration(conf);
    				UserGroupInformation.loginUserFromKeytab(key, tab);
    				Class.forName(driverClass);
    				conn = DriverManager.getConnection(url, user, pwd);
    			}
    			catch (ClassNotFoundException e)
    			{
    				throw new HiveDBException(e);
    			}
    			catch (SQLException e)
    			{
    				throw new HiveDBException(e);
    			}
    			catch (Exception e)
    			{
    				throw new HiveDBException(e);
    			}
    		}
    		return conn;
    	}
    
    	/**
    	 * 关闭连接
    	 */
    	public void close()
    	{
    		try
    		{
    			if (conn != null && !conn.isClosed())
    			{
    				conn.close();
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			conn = null;
    		}
    	}
    
    	/**
    	 * 关闭Statement
    	 * 
    	 * @param stmt
    	 */
    	public void close(Statement stmt)
    	{
    		try
    		{
    			if (stmt != null)
    			{
    				stmt.close();
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			stmt = null;
    		}
    	}
    
    	/**
    	 * 关闭PreparedStatement
    	 * 
    	 * @param pst
    	 */
    	public void close(PreparedStatement pst)
    	{
    		try
    		{
    			if (pst != null)
    			{
    				pst.close();
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			pst = null;
    		}
    	}
    
    	/**
    	 * 关闭ResultSet
    	 * 
    	 * @param rs
    	 */
    	public void close(ResultSet rs)
    	{
    		try
    		{
    			if (rs != null)
    			{
    				rs.close();
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			rs = null;
    		}
    	}
    
    	/**
    	 * 列出指定数据库下的所有表
    	 * 
    	 * @param dataBaseName
    	 * @return
    	 */
    	public List<String> listTables(String dbName)
    	{
    		Statement stmt = null;
    		ResultSet res = null;
    		List<String> tables = new LinkedList<String>();
    		try
    		{
    			stmt = getConnection().createStatement();
    			if (dbName != null && dbName.trim().length() > 0)
    			{
    				stmt.execute("USE " + dbName);
    			}
    			res = stmt.executeQuery("SHOW TABLES");
    			while (res.next())
    			{
    				tables.add(res.getString(1));
    			}
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return tables;
    	}
    	
    	/*
    	 * 获取数据库
    	 */
    	public List<String> showdatabases()
    	{
    		Statement stmt = null;
    		ResultSet res = null;
    		List<String> tables = new LinkedList<String>();
    		try
    		{
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery("SHOW DATABASES");
    			while (res.next())
    			{
    				tables.add(res.getString(1));
    			}
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return tables;
    	}
    
    	/**
    	 * 执行非查询的sql语句,比如创建表,加载数据等等
    	 * 
    	 * @param sql
    	 * @return
    	 */
    	public boolean executeNonQuery(String sql)
    	{
    		Statement stmt = null;
    		boolean result = true;
    		try
    		{
    			stmt = getConnection().createStatement();
    			stmt.execute(sql);
    		}
    		catch (SQLException e)
    		{
    			result = false;
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(stmt);
    			close();
    		}
    		return result;
    	}
    
    	/**
    	 * 使用Statement查询数据,返回ResultSet
    	 * 
    	 * @param sql
    	 * @return
    	 */
    	public ResultSet queryForResultSet(String sql)
    	{
    		Statement stmt = null;
    		ResultSet res = null;
    		try
    		{
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery(sql);
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(stmt);
    		}
    		return res;
    	}
    
    	/**
    	 * 使用Statement查询数据,返回List集合,数据量比较小的时候用
    	 * 
    	 * @param sql
    	 * @return
    	 */
    	public List<Map<String, Object>> queryForList(String sql)
    	{
    		Statement stmt = null;
    		ResultSet res = null;
    		List<Map<String, Object>> list = null;
    		try
    		{
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery(sql);
    			Map<String, Object> map = null;
    			ResultSetMetaData rsmd = res.getMetaData();
    			int rowCnt = rsmd.getColumnCount();
    			list = new LinkedList<Map<String, Object>>();
    			while (res.next())
    			{
    				map = new LinkedHashMap<String, Object>(rowCnt);
    				for (int i = 1; i <= rowCnt; i++)
    				{
    					map.put(rsmd.getColumnName(i), res.getObject(i));
    				}
    				list.add(map);
    			}
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return list;
    	}
    
    	/**
    	 * 使用PreparedStatement查询数据,返回ResultSet
    	 * 
    	 * @param sql
    	 * @param values
    	 * @return
    	 */
    	public ResultSet queryForResultSet(String sql, String[] values)
    	{
    		PreparedStatement pst = null;
    		ResultSet res = null;
    		try
    		{
    			pst = getConnection().prepareStatement(sql);
    			setValue(pst, values);
    			res = pst.executeQuery();
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(pst);
    		}
    		return res;
    	}
    
    	/**
    	 * 使用PreparedStatement查询数据,返回List集合,数据量比较小的时候用
    	 * 
    	 * @param sql
    	 * @param values
    	 * @return
    	 */
    	public List<Map<String, Object>> queryForList(String sql, String[] values)
    	{
    		PreparedStatement pst = null;
    		ResultSet res = null;
    		List<Map<String, Object>> list = null;
    		try
    		{
    			pst = getConnection().prepareStatement(sql);
    			setValue(pst, values);
    			res = pst.executeQuery();
    			Map<String, Object> map = null;
    			ResultSetMetaData rsmd = res.getMetaData();
    			int rowCnt = rsmd.getColumnCount();
    			list = new LinkedList<Map<String, Object>>();
    			while (res.next())
    			{
    				map = new LinkedHashMap<String, Object>(rowCnt);
    				for (int i = 1; i <= rowCnt; i++)
    				{
    					map.put(rsmd.getColumnName(i), res.getObject(i));
    				}
    				list.add(map);
    			}
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(res);
    			close(pst);
    			close();
    		}
    		return list;
    	}
    	
    	
    	/**
    	 * 执行数据文件导入
    	 * 
    	 * @param sql
    	 * @param values
    	 * @return
    	 */
    	public boolean impBySql(String sql)
    	{
    		PreparedStatement pst = null;
    		boolean flag = false;
    		try
    		{
    			pst = getConnection().prepareStatement(sql);
    			flag = pst.execute();
    			
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    		finally
    		{
    			close(pst);
    			close();
    		}
    		return flag;
    	}
    
    	private void setValue(PreparedStatement pst, String[] values)
    	{
    		try
    		{
    			if(values == null || values.length == 0)
    			{
    				return;
    			}
    			for (int i = 0; i < values.length; i++)
    			{
    				pst.setString(i + 1, values[i]);
    			}
    		}
    		catch (SQLException e)
    		{
    			throw new HiveDBException(e);
    		}
    	}
    	
    	/**
    	 * 获取表所在的路径
    	 * @param tblName
    	 * @return
    	 * @throws Exception 
    	 */
        public String getHiveTblPath(String tblName) throws Exception
        {
            String result = "";
            Statement stmt = null;
            ResultSet res = null;
            try
            {
                stmt = getConnection().createStatement();
                res = stmt.executeQuery("desc extended " + tblName);
                while (res.next())
                {
                    if(res.getString(1).trim().equals("Detailed Table Information"))
                    {
                        String content = res.getString(2).trim();
                        int start = content.indexOf("location:");
                        if (start == -1)
                        {
                            continue;
                        }
                        
                        String sub = content.substring(start);
                        int end = sub.indexOf(",");
                        if (end == -1)
                        {
                            continue;
                        }
                        
                        result = sub.substring("location:".length(), end);
                    }
                    else
                    {
                        continue;
                    }
                    
                   // String content = res.getString(1).trim();
                    
                }
            }
            catch (SQLException e)
            {
                throw new Exception(e);
            }
            finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
            return result;
        }
        /**
         * 获取表所在的路径
         * @param tblName
         * @return
         * @throws Exception 
         */
        public Map<String,String> getNewHiveTblPath(String tblName) throws Exception
        {
            String result = "";
            String field = "";
            Statement stmt = null;
            ResultSet res = null;
            Map<String,String> map = new HashMap<String,String>();
            try
            {
                stmt = getConnection().createStatement();
                res = stmt.executeQuery("desc extended " + tblName);
                while (res.next())
                {
                    if(res.getString(1).trim().equals("Detailed Table Information"))
                    {
                        String content = res.getString(2).trim();
                        field=getField(content);
                        int start = content.indexOf("dragoncluster");
                        if (start == -1)
                        {
                            continue;
                        }
                        
                        String sub = content.substring(start);
                        int end = sub.indexOf(",");
                        if (end == -1)
                        {
                            continue;
                        }
                        
                        result = sub.substring("dragoncluster".length(), end);
                    }
                    else
                    {
                        continue;
                    }
                    
                   // String content = res.getString(1).trim();
                    
                }
            }
            catch (SQLException e)
            {
                throw new Exception(e);
            }
            finally
        	{
        		close(res);
        		close(stmt);
        		close();
        	}
            map.put("field", field);
            map.put("hdfsPath", result);
            return map;
        }
        
        public String getField(String content) throws Exception{
        	int start = content.indexOf("field.delim=");
    		if(start==-1){
    			return "\\001";
    		}else{
    			String sub = content.substring(start);
    //			int end = sub.indexOf("}");
    			return sub.substring("field.delim=".length(), "field.delim=".length()+1);
    			
    		}	
        }
        
        
        public String getTblComment(String tblName)
        {
    		String result = null;
    		Statement stmt = null;
    		ResultSet res = null;
            try
            {
                stmt = getConnection().createStatement();
                res = stmt.executeQuery("desc extended " + tblName);
                while (res.next())
                {
                	if(res.getString(1).trim().equals("Detailed Table Information"))
                    {
                        String content = res.getString(2).trim();
                        int start = content.lastIndexOf("comment");
                        if (start == -1)
                        {
                            continue;
                        }
                        
                        String sub = content.substring(start);
    
    					int endBracket = sub.indexOf("}");
    					int endDot = sub.indexOf(",");
    					int end = endBracket < endDot ? endBracket : endDot;
                        if (end == -1)
                        {
                            continue;
                        }
                        
                        result = sub.substring("comment=".length(), end);
                        if(result != null && result.startsWith("null"))
                        {
                        	result = null;
                        }
                    }
                    else
                    {
                        continue;
                    }
                }
            }
            catch (SQLException e)
            {
                e.printStackTrace();
            }
            finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
            return result;
        }
        
        /**
         * desc:获取hive表文件的类型
         * @param tblName
         * @return
         * date:2017年1月4日
         * author:Tonny Chien
         */
        public String getTblFileType(String tblName)
        {
    		String result = null;
    		Statement stmt = null;
    		ResultSet res = null;
            try
            {
                stmt = getConnection().createStatement();
                res = stmt.executeQuery("desc extended " + tblName);
                while (res.next())
                {
                    if(res.getString(1).trim().equals("Detailed Table Information"))
                    {
                        String content = res.getString(2).trim();
                        if(content.toUpperCase().contains("TEXTINPUTFORMAT"))
                        {
                        	result = "TEXTFILE";
                        }
                        else if(content.toUpperCase().contains("SEQUENCEFILEINPUTFORMAT"))
                        {
                        	result = "SEQUENCEFILE";
                        }
                        else
                        {
                        	result = "SEQUENCEFILE";
                        }
                    }
                }
            }
            catch (SQLException e)
            {
                e.printStackTrace();
            }
            finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
            return result;
        }
        
        /**
         * desc:查看表是否为外部表
         * @param tblName
         * @return
         * date:2017年1月4日
         * author:Tonny Chien
         */
        public boolean isExternalTbl(String tblName)
        {
            boolean result = false;
            Statement stmt = null;
            ResultSet res = null;
            try
            {
                stmt = getConnection().createStatement();
                res = stmt.executeQuery("desc extended " + tblName);
                while (res.next())
                {
                    if(res.getString(1).trim().equals("Detailed Table Information"))
                    {
                        String content = res.getString(2).trim();
                        if(content.toUpperCase().contains("EXTERNAL_TABLE"))
                        {
                        	result = true;
                        }
                    }
                }
            }
            catch (SQLException e)
            {
                e.printStackTrace();
            }
            finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
            return result;
        }
        
        public List<String[]> getColumAndType(String tblName)
        {
    		Statement stmt = null;
    		ResultSet res = null;
    		List<String[]> list = null;
    		String[] item = null;
    		try
    		{
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery("desc formatted " + tblName);
    			list = new LinkedList<String[]>();
    			
    			while (res.next())
    			{
    				if (res.getString(1).trim().equals("# col_name"))
    				{
    					continue;
    				}
    					
    				if (res.getString(1).equals("# Detailed Table Information") || res.getString(1).equals("# Partition Information"))
    				{
    					break;
    				}
    					
    				if (res.getString(1).trim().equals(""))
    				{
    					continue;
    				}
    				String column = res.getString(1).trim().toUpperCase();
    				String type = res.getString(2).trim().toUpperCase();
    				String comment = "";
    				if (res.getString(3) != null && res.getString(3).trim().length() > 0)
    				{
    					comment = res.getString(3).trim().toUpperCase();
    					if ("NONE".equals(comment))
    					{
    						comment = "";
    					}
    				}
    				item = new String[]{column, type, comment};
    				
    				list.add(item);
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return list;
    	
        }
        
        public List<String[]> getHiveTblPartitions(String tblName)
    	{
    		Statement stmt = null;
    		ResultSet res = null;
    		List<String[]> list = null;
    		String[] item = null;
    		try
    		{
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery("desc " + tblName);
    			list = new LinkedList<String[]>();
    
    			while (res.next())
    			{
    				if (res.getString(1).equals("# Partition Information"))
    				{
    					while (res.next())
    					{
    						if (res.getString(1).trim().equals("# col_name"))
    						{
    							continue;
    						}
    						if (res.getString(1).trim().equals(""))
    						{
    							continue;
    						}
    						String column = res.getString(1).trim().toUpperCase();
    						String type = res.getString(2).trim().toUpperCase();
    						String comment = "";
    						if (res.getString(3) != null && res.getString(3).trim().length() > 0)
    						{
    							comment = res.getString(3).trim().toUpperCase();
    							if ("NONE".equals(comment))
    							{
    								comment = "";
    							}
    						}
    						item = new String[] { column, type, comment };
    						list.add(item);
    					}
    				}
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return list;
    
    	}
    	
    	public List<String> getHiveTblColumns(String tblName)
    	{
    		Statement stmt = null;
    		ResultSet res = null;
    		List<String> list = null;
    		try
    		{
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery("desc formatted " + tblName);
    			list = new LinkedList<String>();
    			while (res.next())
    			{
    				if (res.getString(1).trim().equals("# col_name"))
    				{
    					continue;
    				}
    					
    				if (res.getString(1).equals("# Detailed Table Information") || res.getString(1).equals("# Partition Information"))
    				{
    					break;
    				}
    					
    				if (res.getString(1).trim().equals(""))
    				{
    					continue;
    				}
    				System.out.println(res.getString(1).trim());
    				list.add(res.getString(1).trim().toUpperCase());
    			}
    		}
    		catch (SQLException e)
    		{
    			e.printStackTrace();
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return list;
    	}
    	
    	/**
    	 * desc:查看某张表是否存在
    	 * @param database
    	 * @param tableName
    	 * @return
    	 * date:2016年10月25日
    	 * author:Tonny Chien
    	 */
    	public boolean existTable(String database, String tableName)
    	{
    		boolean result = false;
    		Statement stmt = null;
    		ResultSet res = null;
    		try
    		{
    			String hql = "SHOW TABLES IN " + database;
    			stmt = getConnection().createStatement();
    			res = stmt.executeQuery(hql);
    			while (res.next())
    			{
    				if (res.getString(1).trim().toUpperCase().equals(tableName.toUpperCase()))
    				{
    					result = true;
    					break;
    				}
    			}
    		}
    		catch (Exception e)
    		{
    			result = false;
    		}
    		finally
    		{
    			close(res);
    			close(stmt);
    			close();
    		}
    		return result;
    	}
    	
    	/**
    	 * 
    	 * @param cmdType grant/revoke
    	 * @param privs 权限
    	 * @param tgtType database/table
    	 * @param tgt 数据库名/表名
    	 * @param recvType group/user
    	 * @param recv 
    	 * @return
    	 * @author Tonny Chien
    	 * @date 207-5-21 20:13
    	 */
    	public boolean auth(AUTH cmdType, String privs, AUTH tgtType, String tgt, AUTH recvType, String recv)
    	{
    		// 拼接命令
    		StringBuilder sb = new StringBuilder();
    		switch (cmdType)
    		{
    		case grant:// 如果是权限
    			sb.append("GRANT ");
    			break;
    		case revoke:// 如果是回收
    			sb.append("REVOKE ");
    			break;
    		default:
    			break;
    		}
    		
    		sb.append(privs);
    		sb.append(" ON ");
    
    		switch (tgtType)
    		{
    		case database:// 如果是数据库
    			sb.append("DATABASE ");
    			break;
    		case table:// 如果是表
    			sb.append("TABLE ");
    			break;
    		default:
    			break;
    		}
    
    		sb.append(tgt);
    
    		switch (cmdType)
    		{
    		case grant:// 如果是权限
    			sb.append(" TO ");
    			break;
    		case revoke:// 如果是回收
    			sb.append(" FROM ");
    			break;
    		default:
    			break;
    		}
    
    		switch (recvType)
    		{
    		case user:// 如果是用户
    			sb.append(" USER ");
    			break;
    		case group:// 如果是组
    			sb.append(" GROUP ");
    			break;
    		default:
    			break;
    		}
    		sb.append(recv);
    
    		String hql = sb.toString();
    
    		boolean result = false;
    		Statement stmt = null;
    		try
    		{
    			stmt = getConnection().createStatement();
    			stmt.execute("set role admin");
    			stmt.execute(hql);
    			result = true;
    		}
    		catch (Exception e)
    		{
    			result = false;
    			e.printStackTrace();
    		}
    		finally
    		{
    			close(stmt);
    			close();
    		}
    		return result;
    	}
    	
    	public static void main(String[] args)
    	{
    		String url = "jdbc:hive2://d004.dragon.com:2181,d002.dragon.com:2181,d005.dragon.com:2181/;serviceDiscoveryMode=zookeeper;zookeeperNamespace=hiveserver2";
    		String key = "openbigdata@DRAGON.COM";//域名不变,用户名变化
    		String tab = "/etc/security/keytabs/openbigdata.keytab";//keytab文件根据具体的用户进行
    		String huser = null;
    		String password = null;
    
    		HiveOper oper = new HiveOper(key, tab, url, huser, password);
    		List<String > dbs = oper.showdatabases();
    		for(String db:dbs)
    		{
    			System.out.println(db);
    		}
    		
    		List<String > tbls = oper.listTables("default");
    		for(String tbl:tbls)
    		{
    			System.out.println(tbl);
    		}
    	}
    }
    
    
    
    class HiveDBException extends RuntimeException
    {
    	private static final long serialVersionUID = 2637639405785985892L;
    
    	public HiveDBException(Exception e)
    	{
    		super(e.getMessage());
    	}
    
    	public HiveDBException(String message, Throwable cause)
    	{
    		super(message, cause);
    	}
    
    	public HiveDBException(String message)
    	{
    		super(message);
    	}
    
    	public HiveDBException(Throwable cause)
    	{
    		super(cause);
    	}
    }
    
  • 相关阅读:
    wget(转)
    852. Peak Index in a Mountain Array
    617. Merge Two Binary Trees
    814. Binary Tree Pruning
    657. Judge Route Circle
    861. Score After Flipping Matrix
    832. Flipping an Image
    461. Hamming Distance
    654. Maximum Binary Tree
    804. Unique Morse Code Words
  • 原文地址:https://www.cnblogs.com/supertonny/p/7148567.html
Copyright © 2011-2022 走看看