实验要求
https://pan.baidu.com/s/1hTnGnxdvgyNaLwckbKUgJA
程序源代码
package cleandata; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CleanResult { public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{ Job job = Job.getInstance(); job.setJobName("CleanResult"); job.setJarByClass(CleanResult.class); job.setMapperClass(doMapper.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path in = new Path("hdfs://192.168.12.128:9000/mapreduce/in/result.txt"); Path out = new Path("hdfs://192.168.12.128:9000/mapreduce/out/clean"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0:1); } public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{ public static Text word = new Text(); public static final IntWritable id = new IntWritable(); @Override protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String[] data = value.toString().split(","); String[] time = data[1].split(":"); data[1] = "2016-11-10 " + time[1] + ":" + time[2] + ":" + time[3].split(" ")[0]; String traffic = ""; for(int i=0;i<data[3].length();i++){ if(data[3].charAt(i) >= 48 && data[3].charAt(i)<=57) { traffic += data[3].charAt(i); } } data[3] = traffic; String newValue = data[0]; for(int i=1;i<5;i++) newValue += " " + data[i]; id.set(Integer.parseInt(data[5])); word.set(newValue); context.write(word, id); } } public static class doReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println("key:" + key); for(IntWritable value:values){ result.set(value.get()); } System.out.println("value:" + result); context.write(key, result); } } }
清洗后数据导入到hive:
命令行导入:
load data inpath '/mapreduce/out/clean/part-r-00000' into table data;
Java代码:
package cleandata; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; public class LoadDataFileToHive { public static void main(String[] args) { String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver"; String CONNECTION_URL = "jdbc:hive2://192.168.12.128:10000/default"; String username = ""; String password = ""; Connection con = null; try { Class.forName(JDBC_DRIVER); con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password); Statement stmt = con.createStatement(); String sql = "load data inpath '/mapreduce/out/clean/part-r-00000' into table data"; stmt.execute(sql); System.out.println("数据导入至hive成功!"); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭rs、ps和con if(con != null){ try { con.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
运行结果截图
数据清洗完毕:
文件从hdfs上传至hive后