经过再一次较为系统的学习,终于通过自己的编程实现了
统计最受欢迎的视频/文章的Top10访问次数 (video/article)
实现过程为两次使用MapReduce:
第一次对数据进行处理,留取视频、文章的ID为Key值,总的访问次数为Value值;
第二次对数据进行排序处理,统计最受欢迎的视频或文章;
第一次MapReduce
package test; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.Mapper; public class test01 { public static class MyMap extends Mapper<Object, Text, Text, IntWritable> { private static Text id = new Text(); public static final IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String arr[] = line.split(","); id.set(arr[5]+" "); context.write(id, one); } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws IOException ,ClassNotFoundException,InterruptedException{ Job job=Job.getInstance(); job.setJobName("test01"); job.setJarByClass(test01.class); job.setMapperClass(MyMap.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path in=new Path("hdfs://localhost:9000/usr/hadoop/result.txt"); Path out=new Path("hdfs://localhost:9000/usr/hadoop/result"); FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true)?0:1); } }
第二次MapReduce:
package test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.Mapper; public class test02 { public static class Map extends Mapper<Object, Text, IntWritable, Text> { private static Text id = new Text(); private static IntWritable num = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println(line); String arr[] = line.split(" "); /* * System.out.print(arr[0]); System.out.println(arr[1]); */ num.set(Integer.parseInt(arr[1])); id.set(arr[0]); context.write(num, id); } } public static class red extends Reducer<IntWritable, Text, IntWritable, Text> { public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { context.write(key, val); } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "test02"); job.setJarByClass(test02.class); job.setMapperClass(Map.class); job.setReducerClass(red.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in = new Path( "hdfs://localhost:9000/usr/hadoop/result/part-r-00000"); Path out = new Path("hdfs://localhost:9000/usr/hadoop/result/out"); FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }