一、需求
有一个列表,只有两列:id、pro,记录了id与pro的对应关系,但是在同一个id下,pro有可能是重复的。
现在需要写一个程序,统计一下每个id下有多少个不重复的pro。
为了写一个完整的示例,我使用了多job!
二、文件目录
|- OutCount //单Job的,本次试验没有使用到,这里写出来供参考 |- OutCount2 |- OutCountMapper |- OutCountMapper2 |- OutCountReduce |- OutCountReduce2
三、样本数据(部分)
2,10000088379 9,10000088379 6,10000088379 1,10000088379 8,10000088379 0,10000088379 1,10000088379 4,10000091621 3,10000091621 2,10000091621 0,10000091621 6,10000091621 2,10000091621 0,10000091621 0,10000091621 9,10000091621 2,10000091621
四、Java代码
1、OutCountMapper.java
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * created by wangjunfu on 2017-05-25. * 4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型 * map 和 reduce 的数据输入输出都是以 key-value对的形式封装的 * 默认情况下,Map框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量(选用LongWritable),value是这一行的内容(VALUEIN选用Text) * 在wordcount中,经过mapper处理数据后,得到的是<单词,1>这样的结果,所以KEYOUT选用Text,VAULEOUT选用IntWritable */ public class OutCountMapper extends Mapper<LongWritable, Text, Text, Text> { // MapReduce框架每读一行数据就调用一次map方法 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 数据格式:uid skuid String oneline = value.toString().replace(',', '_').trim(); // 去重思路:Map的key具有数据去重的功能,以整个数据作为key发送出去, value为null context.write(new Text(oneline), new Text("")); /* // 这里需要说明一下,我们现在的样本是标准的,一行一个样本。 // 有的情况下一行多个,那就需要进行分割。 // 对这一行的文本按特定分隔符切分 String[] words = oneline.split(" "); for (String word : words) { // 遍历这个单词数组,输出为key-value形式 key:单词 value : 1 context.write(new Text(word), new IntWritable(1)); } */ } }
2、OutCountReduce.java
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * created by wangjunfu on 2017-05-25. * 经过mapper处理后的数据会被reducer拉取过来,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致 * 经过reducer处理后的数据格式为<单词,频数>,所以KEYOUT为Text,VALUEOUT为IntWritable */ public class OutCountReduce extends Reducer<Text, Text, Text, Text> { // 当mapper框架将相同的key的数据处理完成后,reducer框架会将mapper框架输出的数据<key,value>变成<key,values{}>。 // 例如,在wordcount中会将mapper框架输出的所有<hello,1>变为<hello,{1,1,1...}>,即这里的<k2,v2s>,然后将<k2,v2s>作为reduce函数的输入 // 这个将在下面reduce2 中得到体现 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } }
3、OutCountMapper2.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * created by wangjunfu on 2017-05-27. * 将原始数据作为map输出的key设置为int类型。map会自动的根据key进行排序 */ public class OutCountMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 数据格式:uid_skuid String oneline = value.toString(); // 将这条数据中的uid 发出去, value为计算one context.write(new Text(oneline.split("_")[0]), one); } }
4、OutCountReduce2.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * created by wangjunfu on 2017-05-27. * 按统计数排序:将values作为次序key,将map排序好的key作为value输出 */ public class OutCountReduce2 extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 迭代器,访问容器中的元素,为容器而生 Iterator<IntWritable> itr = values.iterator(); while (itr.hasNext()) { sum += itr.next().get(); } /* // 这种遍历也可以 // 遍历v2的list,进行累加求和 for (IntWritable v2 : itr) { sum = v2.get(); } */ // 按统计数排序:将values作为次序key,将map排序好的key作为value输出 //context.write(new IntWritable(sum), key); //需要再起一个 map-reduce context.write(key, new IntWritable(sum)); } }
5、OutCount2.java
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 需求:给定一个列表uid skuid,求出uid下不重复的skuid数据;然后再按统计大小排序。 * 涉及到多job 处理。 * created by wangjunfu on 2017-05-27. */ public class OutCount2 { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(OutCount.class); //第一个job的配置 Job job1 = new Job(conf, "Join1"); job1.setJarByClass(OutCount.class); job1.setMapperClass(OutCountMapper.class); job1.setReducerClass(OutCountReduce.class); job1.setMapOutputKeyClass(Text.class); //map阶段的输出的key job1.setMapOutputValueClass(Text.class); //map阶段的输出的value job1.setOutputKeyClass(Text.class); //reduce阶段的输出的key job1.setOutputValueClass(Text.class); //reduce阶段的输出的value //job-1 加入控制容器 ControlledJob ctrljob1 = new ControlledJob(conf); ctrljob1.setJob(job1); //job-1 的输入输出文件路径 FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); //第二个job的配置 Job job2 = new Job(conf, "Join2"); job2.setJarByClass(OutCount.class); // 设置job所在的类在哪个jar包 job2.setMapperClass(OutCountMapper2.class); // 指定job所用的mappe类 job2.setReducerClass(OutCountReduce2.class); // 指定job所用的reducer类 // 指定mapper输出类型和reducer输出类型 // 由于在wordcount中mapper和reducer的输出类型一致, // 所以使用setOutputKeyClass和setOutputValueClass方法可以同时设定mapper和reducer的输出类型 // 如果mapper和reducer的输出类型不一致时,可以使用setMapOutputKeyClass和setMapOutputValueClass单独设置mapper的输出类型 job2.setMapOutputKeyClass(Text.class); //map阶段的输出的key job2.setMapOutputValueClass(IntWritable.class); //map阶段的输出的value job2.setOutputKeyClass(Text.class); //reduce阶段的输出的key job2.setOutputValueClass(IntWritable.class); //reduce阶段的输出的value //job-2 加入控制容器 ControlledJob ctrljob2 = new ControlledJob(conf); ctrljob2.setJob(job2); //设置多个作业直接的依赖关系 //job-2 的启动,依赖于job-1作业的完成 ctrljob2.addDependingJob(ctrljob1); //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好 FileInputFormat.addInputPath(job2, new Path(args[1])); //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得 //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了 FileOutputFormat.setOutputPath(job2, new Path(args[2])); //主的控制容器,控制上面的总的两个子作业 JobControl jobCtrl = new JobControl("myOutCount"); //添加到总的JobControl里,进行控制 jobCtrl.addJob(ctrljob1); jobCtrl.addJob(ctrljob2); //在线程启动,记住一定要有这个 Thread t = new Thread(jobCtrl); t.start(); while (true) { if (jobCtrl.allFinished()) { //如果作业成功完成,就打印成功作业的信息 System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } } }
6、OutCount.java
单Job的,本次试验没有使用到,这里写出来供参考
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; /** * 需求:给定一个列表uid skuid,求出uid下不重复的skuid数据;然后再按统计大小排序。 * 涉及到多job 处理。 * created by wangjunfu on 2017-05-25. */ public class OutCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //指定作业执行规范 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage:wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); //指定job名称,及运行对象 job.setJarByClass(OutCount.class); job.setMapperClass(OutCountMapper.class); //指定map函数 job.setCombinerClass(OutCountReduce.class); //是否需要conbiner整合 job.setReducerClass(OutCountReduce.class); //指定reduce函数 job.setOutputKeyClass(Text.class); //输出key格式 job.setOutputValueClass(IntWritable.class); //输出value格式 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //处理文件路径 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //结果输出路径 // 将job提交给集群运行 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
五、结果
11 0 11 1 7 2 10 3 10 4 9 5 10 6 7 7 13 8 9 9