石家庄铁道大学2019年秋季
2017 级课堂测试试卷—数据清洗
课程名称: 大型数据库应用技术 任课教师: 王建民 考试时间: 100 分钟
Result文件数据说明:
Ip:106.39.41.166,(城市)Date:10/Nov/2016:00:01:02 +0800,(日期)Day:10,(天数)Traffic: 54 ,(流量)Type: video,(类型:视频video或文章article)
Id: 8701(视频或者文章的id)
测试要求:
1、 数据清洗:按照进行数据清洗,并将清洗后的数据导入hive数据库中。
两阶段数据清洗:
(1)第一阶段:把需要的信息从原始日志中提取出来
ip: 199.30.25.88time: 10/Nov/2016:00:01:03 +0800traffic: 62文章: article/11325视频: video/3235
(2)第二阶段:根据提取出来的信息做精细化操作
ip--->城市 city(IP)date--> time:2016-11-10 00:01:03day: 10traffic:62type:article/videid:11325
(3)hive数据库表结构:
create table data( ip string, time string , day string, traffic bigint,type string, id string )
2、数据处理:
·统计地区最受欢迎的视频/文章的Top10访问次数 (video/article)
·按照地市统计最受欢迎的Top10课程 (ip)
·按照流量统计最受欢迎的Top10课程 (traffic)
3、数据可视化:将统计结果倒入MySql数据库中,通过图形化展示的方式展现出来。
******************************************************************************
说明:
运行环境:MyEclipse(linux外面)
我的理解是有错误的,这些代码都是之前那11个改编的。但确实是能实现简单的清洗。感谢韩代表知道我配置环境*.*
数据清洗:
1 package mapreduce1; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.List; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17 public class Result_1{ 18 19 static int Sum=0; 20 public static class Map extends Mapper<Object , Text , Text,Text>{ 21 private static Text Name =new Text(); 22 private static Text num=new Text(); 23 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ 24 String line=value.toString(); 25 String arr[]=line.split(","); 26 Name.set(arr[0]); 27 String trm =arr[3].trim(); 28 num.set(trm); 29 System.out.println(num); 30 context.write(Name,num); 31 } 32 } 33 public static class Reduce extends Reducer< Text, Text,Text, Text>{ 34 int i=0; 35 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ 36 Text num=new Text(); 37 for(Text val:values){ 38 num=val; 39 Sum+=1; 40 } 41 String mid=new String(); 42 mid=String.valueOf(Sum); 43 mid=num.toString()+" "+mid; 44 num.set(mid); 45 context.write(key,num); 46 System.out.println(Sum); 47 } 48 } 49 public static int run()throws IOException, ClassNotFoundException, InterruptedException 50 { 51 Configuration conf=new Configuration(); 52 conf.set("fs.defaultFS", "hdfs://192.168.1.100:9000"); 53 FileSystem fs =FileSystem.get(conf); 54 Job job =new Job(conf,"Result_1"); 55 job.setJarByClass(Result_1.class); 56 job.setMapperClass(Map.class); 57 job.setReducerClass(Reduce.class); 58 job.setOutputKeyClass(Text.class); 59 job.setOutputValueClass(Text.class); 60 job.setInputFormatClass(TextInputFormat.class); 61 job.setOutputFormatClass(TextOutputFormat.class); 62 Path in=new Path("hdfs://192.168.1.100:9000/mymapreduce1/in/result.txt"); 63 Path out=new Path("hdfs://192.168.1.100:9000/mymapreduce1/out_result"); 64 FileInputFormat.addInputPath(job,in); 65 fs.delete(out,true); 66 FileOutputFormat.setOutputPath(job,out); 67 return(job.waitForCompletion(true) ? 0 : 1); 68 } 69 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ 70 71 run(); 72 } 73 }
结果:
简单排序
1 package mapreduce1; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.List; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.IntWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.io.WritableComparable; 13 import org.apache.hadoop.io.WritableComparator; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 21 public class Result_2 { 22 23 public static List<String> Names=new ArrayList<String>(); 24 public static List<String> Values=new ArrayList<String>(); 25 public static List<String> Texts=new ArrayList<String>(); 26 public static class Sort extends WritableComparator { 27 public Sort(){ 28 //这里就是看你map中填的输出key是什么数据类型,就给什么类型 29 super(IntWritable.class,true); 30 } 31 @Override 32 public int compare(WritableComparable a, WritableComparable b) { 33 return -a.compareTo(b);//加个负号就是倒序,把负号去掉就是正序。 34 } 35 } 36 public static class Map extends Mapper<Object , Text , IntWritable,Text >{ 37 private static Text Name=new Text(); 38 private static IntWritable num=new IntWritable(); 39 public void map(Object key,Text value,Context context)throws IOException, InterruptedException 40 { 41 String line=value.toString(); 42 String mid=new String(); 43 String arr[]=line.split(" "); 44 if(!arr[0].startsWith(" ")) 45 { 46 num.set(Integer.parseInt(arr[2])); 47 mid=arr[0]+" "+arr[1]; 48 Name.set(mid); 49 context.write(num, Name); 50 } 51 52 } 53 } 54 public static class Reduce extends Reducer< IntWritable, Text, Text, IntWritable>{ 55 private static IntWritable result= new IntWritable(); 56 int i=0; 57 58 public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ 59 for(Text val:values){ 60 61 if(i<10) 62 {i=i+1; 63 String mid=new String(); 64 mid=val.toString(); 65 String arr[]=mid.split(" "); 66 Texts.add(arr[1]); 67 Names.add(arr[0]); 68 Values.add(key.toString()); 69 } 70 context.write(val,key); 71 } 72 } 73 } 74 75 76 77 78 79 public static int run()throws IOException, ClassNotFoundException, InterruptedException{ 80 Configuration conf=new Configuration(); 81 conf.set("fs.defaultFS", "hdfs://192.168.1.100:9000"); 82 FileSystem fs =FileSystem.get(conf); 83 Job job =new Job(conf,"Result_2"); 84 job.setJarByClass(Result_2.class); 85 job.setMapperClass(Map.class); 86 job.setReducerClass(Reduce.class); 87 job.setSortComparatorClass(Sort.class); 88 job.setOutputKeyClass(IntWritable.class); 89 job.setOutputValueClass(Text.class); 90 job.setInputFormatClass(TextInputFormat.class); 91 job.setOutputFormatClass(TextOutputFormat.class); 92 Path in=new Path("hdfs://192.168.1.100:9000/mymapreduce1/out_result/part-r-00000"); 93 Path out=new Path("hdfs://192.168.1.100:9000/mymapreduce1/out_result1"); 94 FileInputFormat.addInputPath(job,in); 95 fs.delete(out,true); 96 FileOutputFormat.setOutputPath(job,out); 97 return(job.waitForCompletion(true) ? 0 : 1); 98 99 100 } 101 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ 102 run(); 103 for(String n:Names) 104 { 105 System.out.println(n); 106 } 107 } 108 }
结果: