zoukankan      html  css  js  c++  java
  • MapReduce实现TopK的示例

      由于开始学习MapReduce编程已经有一段时间了,作为一个从编程中寻找自信和乐趣以及热爱编程的孩子来讲,手开始变得很“痒”了,很想小试一下身手。于是自己编写了TopK的代码。TopK的意思就是从原文件中找出词频排名前K的所有单词。首先分析该问题,从中我们可以得到启发:要想知道词频排名前K的所有单词,那么是不是要对所有的单词进行词频的统计啊?于是我们就联想到了一个比较经典的例子:WordCount的例子。是的,没错。就是它,统计原文件中每个单词的个数就靠它。

      但是,我们词频统计出来了,接下来需要做的就是如何找出词频排名前K的所有单词。如何找出词频排名前K呢?我们知道,WordCount得到的结果就是所有单词的词频情况,并且是已排好序的。所以,我们接下来需要做的是:

    1、将所有相同词频的单词汇总,这一步就是map之后的shuffle过程可以得到相应的结果,只需要在map阶段将词频作为key,单词多作为value即可。

    2、找出排名前k的词频的所有单词,并且按照词频的顺序排序,在这一步当中,很多人通过采用TreeMap的数据结构来实现,但是这里要注意点,TreeMap对于相同的键值是会进行覆盖的。因此无法操作相同键值的数据。也有些人对key进行封装,但同样还是避免不了有相同键值的结果。因此,我在这里采用的方法是将所有词频相同的单词用ArrayList存放起来,最后在将ArrayList的内容写入待hdfs中即可。

      综上所述,要实现TopK的结果,需要用到两个MR作业,一个是WordCount作业,一个是TopK的作业。

    代码如下:

    1、WordCount的部分:

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class MyTopK {
    	
    	public static class  Mymap extends Mapper<LongWritable, Text, Text, IntWritable>{
    		
    		private final IntWritable one =new IntWritable(1);
    		private Text word =new Text();
    		
    		public void map(LongWritable ikey,Text ivalue,Context context) throws IOException, InterruptedException{
    			StringTokenizer str=new StringTokenizer(ivalue.toString());
    			while(str.hasMoreTokens()){
    				word.set(str.nextToken());
    				context.write(word, one);
    			}
    		}
    	}
    	
    	
    	public static class Myreduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    		private IntWritable result=new IntWritable();
    		
    		
    		public void reduce(Text ikey,Iterable<IntWritable> ivalue,
    				Context context) throws IOException, InterruptedException{
    			int sum=0;
    			for(IntWritable val:ivalue){
    				sum+=val.get();
    			}
    			result.set(sum);
    			context.write(ikey, result);
    		}
    	}
    	
    	//设置静态的函数,方便直接在main中通过类名 来调用
    	public static boolean run(String in ,String out) throws IOException, ClassNotFoundException, InterruptedException{
    		
    		Configuration conf =new Configuration();
    		
    		Job job=new Job(conf,"Wordcount");
    		job.setJarByClass(MyTopK.class);
    		job.setMapperClass(Mymap.class);
    		job.setReducerClass(Myreduce.class);
    		
    		//设置map输出类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		//设置reduce的输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		//设置输入输出路径
    		FileInputFormat.addInputPath(job, new Path(in));
    		FileOutputFormat.setOutputPath(job, new Path(out));
    		
    		return job.waitForCompletion(true);
    	}
    }
    

      2、TopK的实现过程

    import java.io.IOException;
    import java.util.Map.Entry;
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Set;
    import java.util.StringTokenizer;
    import java.util.TreeMap;
    import java.util.regex.Pattern;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    
    public class MyTopK1 {
    	
    	public static class MyMap extends Mapper<LongWritable, Text, IntWritable, Text>{
    		
    		IntWritable outkey=new IntWritable();
    		Text outvalue=new Text();
    		
    		public void map(LongWritable ikey,Text ivalue,Context context) throws IOException, InterruptedException{
    			StringTokenizer str=new StringTokenizer(ivalue.toString());
    			while(str.hasMoreTokens()){
    				//这个表示是输入数据的每行数据,每一行包含了单词和单词的次数,这个内容否在ivalue中,下面需要将ivalue中的单词和单词的次数进行分离。
    				String element=str.nextToken();
    				if(Pattern.matches("\d+", element)){//这里利用正则表达式来匹配单词的个数
    					outkey.set(Integer.parseInt(element));//将单词的个数作为键值
    				}else {
    					outvalue.set(element);//将单词作为键值值
    				}
    			}
    			context.write(outkey, outvalue);//在写的过程中会对单词的次数进行排序
    		}
    		
    	}
    	
    	public  static TreeMap<Integer, ArrayList<String> > hm =new TreeMap<Integer, ArrayList<String> >(new Comparator<Integer>() {
    		public int compare(Integer v1,Integer v2){
    			return v2.compareTo(v1);
    		}
    	});//用来选择出topK
    	private static MultipleOutputs<Text, IntWritable> mos=null;//用来进行多文件输出
    	
    	private static String path=null;
    	
    	//通过shuffle过程之后,相同次数的单词就在一起了,并将这个数据作为reduce的输入数据
    	public static class Myreduce extends Reducer<IntWritable, Text, Text, IntWritable>{
    		
    		public void reduce(IntWritable ikey,Iterable<Text> ivalue,Context context) throws IOException, 
    		InterruptedException{
    			ArrayList<String> tmp=new ArrayList<String>(10);
    			for(Text val:ivalue){
    				context.write(val,ikey);//输出全排序的内容
    //				tmp.add(val.toString());  //这里会造成占用较多的内存,这里可以优化
    				
    				//优化的方法就是限定列表的长度,由于是topk,所以每一个最多也就是10个即可
    				if(tmp.size()<=10){
    					tmp.add(val.toString());  
    				}
    			}
    			hm.put(ikey.get(), tmp);
    		}
    		
    		private static int topKNUM=10; //表示求最高的多少个数
    		protected void cleanup(Context context) throws IOException,
    				InterruptedException {
    			//String path = context.getConfiguration().get("topKout");
    			mos = new MultipleOutputs<Text, IntWritable>(context);
    			Set<Entry<Integer, ArrayList<String> > > set =   hm.entrySet();
    			
    			for (Entry<Integer, ArrayList<String>> entry : set) {
    				ArrayList<String> al = entry.getValue();
    				if (topKNUM-al.size() > 0) {
    					for (String word : al) {
    						//if (topKNUM-- > 0) {
    							mos.write("topKMOS", new Text(word), // 这里参数“topKMOS”表示一个属性名称
    									new IntWritable(entry.getKey()), path);
    						//}
    					}
    				}
    			}
    			mos.close();
    		}
    
    	}
    	@SuppressWarnings("deprecation")
    	public static  void run(String in,String out,String topkout) throws IOException, ClassNotFoundException, InterruptedException{
    		
    		Configuration conf=new Configuration();
    		
    		//创建作业,并制定map和reduce类
    
    		Job job=new Job(conf);
    		job.setJarByClass(MyTopK1.class);
    		job.setMapperClass(MyMap.class);
    		job.setReducerClass(Myreduce.class);
    		
    		//TopK的输出路径
    		path=topkout;
    		//conf.set("topKout",topkout);
    		
    		//设置map输出的类型
    		job.setMapOutputKeyClass(IntWritable.class);
    		job.setMapOutputValueClass(Text.class);
    		
    		//设置reduce的输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		//设置MultipleOutputs输出格式,//这里的第二个参数“topKMOS”要跟write方法中的参数相同
    		MultipleOutputs.addNamedOutput(job, "topKMOS",TextOutputFormat.class, Text.class, IntWritable.class);
    		
    		//设置输入输出格式
    		FileInputFormat.addInputPath(job, new Path(in));
    		FileOutputFormat.setOutputPath(job, new Path(out));
    		
    		//提交作业
    		job.waitForCompletion(true);
    	}
    }
    

      3、程序的入口

    import java.io.IOException;
    
    import org.apache.log4j.PropertyConfigurator;
    
    
    public class TopKmain {
    
    	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
    		// TODO Auto-generated method stub、、
    		//这里要记住一点,手动加载一些log4j文件,一来可以去掉警告,二来可以在出现错误时,通过查看日志,了解错误详细内容
    		String rootPath = System.getProperty("user.dir" );
    		PropertyConfigurator.configure(rootPath+"\log4j.properties");
    		
    		//要统计字数,排序的文字
            String in = "hdfs://192.168.1.21:9000/input";
            
            //统计字数后的结果
            String wordCoutoutput = "hdfs://192.168.1.21:9000/out";
            
            //对统计完后的结果再排序后的内容
            String sort = "hdfs://192.168.1.21:9000/sort";
            
            //指定前K条输出的文件名称
            String topK = "hdfs://192.168.1.21:9000/topK";
            
            //如果统计字数的job完成后就开始排序
            if(MyTopK.run(in, wordCoutoutput)){
            	MyTopK1.run(wordCoutoutput, sort,topK);
            } 
    	}
    
    }
    

      

  • 相关阅读:
    复利计算(修改后)
    复利计算测试(C语言)
    实验一、命令解释程序的编写
    《构建之法》之第一二三章读后感
    复利计算1.0 2.0 3.0
    复利计算总结
    Scrum 项目7.0——第一个Sprint的总结和读后感
    Scrum 项目7.0——第一个Sprint的演示和回顾
    Scrum 项目4.0&&5.0
    操作系统——进程调度模拟程序
  • 原文地址:https://www.cnblogs.com/ljy2013/p/4483101.html
Copyright © 2011-2022 走看看