资源文件工具类把sql脚本转换为String字符串--->交给sql工具类ExecSQLUtil执行sql
1.资源文件工具类(ResourceUtil)
把sql脚本转换为String字符串
/** * 资源文件工具类 */ public class ResourceUtil { /** * 以String方式读取整个资源串 */ public static String readResourceAsString(String resource ,String charset) throws Exception { InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos = new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); } return new String(baos.toByteArray() , charset) ; } /** * 以String方式读取整个资源串 */ public static String readResourceAsString(String resource) throws Exception { InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos = new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); } String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ; //替换掉注释 sql = sql.replaceAll("--.* ", "") ; return sql ; } /** * 将资源文件读取出来,形成list */ public static List<String> readResourceAsList(String resource) throws Exception { List<String> list = new ArrayList<String>() ; InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource); BufferedReader br = new BufferedReader(new InputStreamReader(input)) ; String line = null ; while((line = br.readLine()) != null){ if(!line.trim().equals("")){ list.add(line) ; } } return list ; } }
2.sql执行工具类(ExecSQLUtil)
执行sql
package com.oldboy.umeng.spark.stat; import com.oldboy.umeng.common.util.ResourceUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * 执行脚本工具类 */ public class ExecSQLUtil { /** * 执行sql脚本 */ public static void execSQLScript(SparkSession sess, String sqlScript) throws Exception { //资源工具类 把sql脚本转化为String String sqls = ResourceUtil.readResourceAsString(sqlScript); String arr[] = sqls.split(";"); for (String sql : arr) { if (!sql.trim().equals("")) { sess.sql(sql).show(1000, false); } } } /** * 执行sqlsStr */ public static void execSQLString(SparkSession sess, String sqlString) throws Exception { String arr[] = sqlString.split(";"); for (String sql : arr) { if (!sql.trim().equals("")) { sess.sql(sql).show(1000, false); } } } /** * 执行sqlsStr */ public static Dataset<Row> execSQLString2(SparkSession sess, String sqlString) throws Exception { String arr[] = sqlString.split(";"); for (int i = 0 ; i< arr.length ; i ++) { if (!arr[i].trim().equals("")) { if(i != arr.length - 1){ sess.sql(arr[i]).show(); ; } else{ return sess.sql(arr[i]) ; } } } return null ; } /** * 注册函数 */ public static void execRegisterFuncs(SparkSession sess) throws Exception { execSQLScript(sess, "funcs.sql"); } }
3.例如 清洗转储数据
/** * 清洗数据 */ public class DataCleanJava { public static void main(String[] args) throws Exception { String log_sql_script_name = "data_clean_startup.sql" ; if(args != null && args.length > 0){ log_sql_script_name = args[0] ; } SparkConf conf = new SparkConf(); conf.setAppName("dataClean") ; conf.setMaster("local[4]") ; SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport( ).getOrCreate(); //先注册函数 ExecSQLUtil.execRegisterFuncs(sess); //执行sql ExecSQLUtil.execSQLScript(sess , "data_clean_error.sql"); } }
SQL脚本
funcs.sql注册函数脚本
use big12_umeng ; drop function if exists forkstartuplogs ; drop function if exists forkeventlogs ; drop function if exists forkerrorlogs ; drop function if exists forkpagelogs ; drop function if exists forkusagelogs ; drop function if exists formatbyday ; drop function if exists formatbyweek ; drop function if exists formatbymonth ; create TEMPORARY function forkstartuplogs as 'com.oldboy.umeng.hive.udtf.ForkStartuplogsUDTF' ; create TEMPORARY function forkeventlogs as 'com.oldboy.umeng.hive.udtf.ForkEventlogsUDTF' ; create TEMPORARY function forkerrorlogs as 'com.oldboy.umeng.hive.udtf.ForkErrorlogsUDTF' ; create TEMPORARY function forkpagelogs as 'com.oldboy.umeng.hive.udtf.ForkPagelogsUDTF' ; create TEMPORARY function forkusagelogs as 'com.oldboy.umeng.hive.udtf.ForkUsagelogsUDTF' ; create TEMPORARY function formatbyday as 'com.oldboy.umeng.hive.udf.FormatByDayUDF' ; create TEMPORARY function formatbyweek as 'com.oldboy.umeng.hive.udf.FormatByWeekUDF' ; create TEMPORARY function formatbymonth as 'com.oldboy.umeng.hive.udf.FormatByMonthUDF' ;