zoukankan      html  css  js  c++  java
  • hadoop编程小技巧(1)---map端聚合

    測试hadoop版本号:2.4 

    Map端聚合的应用场景:当我们仅仅关心全部数据中的部分数据时,而且数据能够放入内存中。

    使用的优点:能够大大减小网络数据的传输量,提高效率;

    一般编程思路:在Mapper的map函数中读入全部数据,然后加入到一个List(队列)中。然后在cleanup函数中对list进行处理。输出我们关系的少量数据。

    实例:

    在map函数中使用空格分隔每行数据。然后把每一个单词加入到一个堆栈中,在cleanup函数中输出堆栈中单词次数比較多的单词以及次数。

    package fz.inmap.aggregation;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.PriorityQueue;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    public class InMapArrgegationDriver extends Configured implements Tool{
    	public static Logger log = LoggerFactory.getLogger(InMapArrgegationDriver.class);
    	/**
    	 * @throws Exception 
    	 * 
    	 */
    	public static void main(String[] args) throws Exception {
    		ToolRunner.run(new Configuration(), new InMapArrgegationDriver(),args);
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		if(arg0.length!=3){
    			System.err.println("Usage:
    fz.inmap.aggregation.InMapArrgegationDriver <in> <out> <maxNum>");
    			return -1;
    		}
    		Configuration conf = getConf();
    		
    //		System.out.println(conf.get("fs.defaultFS"));
    		Path in = new Path(arg0[0]);
    		Path out= new Path(arg0[1]);
    		out.getFileSystem(conf).delete(out, true);
    		conf.set("maxResult", arg0[2]);
    		Job job = Job.getInstance(conf,"in map arrgegation job");
    		job.setJarByClass(getClass());
    		
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		
    		job.setMapperClass(InMapMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    //		job.setOutputKeyClass(LongWritable.class);
    //		job.setOutputValueClass(VectorWritable.class);
    		job.setNumReduceTasks(0);
    //		System.out.println(job.getConfiguration().get("mapreduce.job.reduces"));
    //		System.out.println(conf.get("mapreduce.job.reduces"));
    		FileInputFormat.setInputPaths(job, in);
    		FileOutputFormat.setOutputPath(job, out);
    		
    		return job.waitForCompletion(true)?0:-1;
    	}
    	
    	protected static class InMapMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    		private ArrayList<Word> words = new ArrayList<Word>();
    		private PriorityQueue<Word> queue;
    		private int maxResult;
    		
    		protected void setup(Context cxt){
    			maxResult = cxt.getConfiguration().getInt("maxResult", 10);
    		}
    		
    		protected void map(LongWritable key, Text value,Context cxt){
    			String  [] line = value.toString().split(" "); // use blank to split
    			for(String word:line){
    				Word curr = new Word(word,1);
    				if(words.contains(curr)){
    					// increase the exists word's frequency
    					for(Word w:words){
    						if(w.equals(curr)){
    							w.frequency++;
    							break;
    						}
    					}
    				}else{
    					words.add(curr);
    				}
    			}
    		}
    		protected void cleanup(Context cxt) throws InterruptedException,IOException{
    			Text outputKey = new Text();
    			IntWritable outputValue = new IntWritable();
    			
    			queue = new PriorityQueue<Word>(words.size());
    			queue.addAll(words);
    			for(int i=0;i< maxResult;i++){
    				Word tail = queue.poll();
    				if(tail!=null){
    					outputKey.set(tail.value);
    					outputValue.set(tail.frequency);
    					log.info("key is {},value is {}", outputKey,outputValue);
    					cxt.write(outputKey, outputValue);
    					
    				}
    			}
    		}
    	}
    
    }
    

    使用到的Word类

    package fz.inmap.aggregation;
    
    public class Word implements Comparable<Word>{
    
    	public String value;
    	public int frequency;
    	
    	public Word(String value,int frequency){
    		this.value=value;
    		this.frequency=frequency;
    	}
    	@Override
    	public int compareTo(Word o) {
    		return o.frequency-this.frequency;
    	}
    	@Override
    	public boolean equals(Object obj){
    		if(obj instanceof Word){
    			return value.equalsIgnoreCase(((Word)obj).value);
    		}else{
    			return false;
    		}
    	}
    }
    

    查看输出结果,能够看日志(因为在程序中输出了日志,所以在日志中也能够查看到);


    或者查看输出结果:



    总结:使用map端聚合,尽管能够大大减小网络传输数据量。提高效率,可是我们在应用的时候还是须要考虑实际的应用环境。比方。假设使用上面的算法来计算最大单词频率的前10个,然后还是使用上面的代码。就会有问题。

    每一个mapper会处理并输出自己的单词词频最大的10个单词,并没有考虑到全部数据。这样在reducer端整合的时候就会可能会忽略部分数据,造成终于结果的错误。



    分享,成长,快乐

    转载请注明blog地址:http://blog.csdn.net/fansy1990


查看全文
  • 相关阅读:
    【引用】webkit内核浏览器支持的特殊CSS样式和CSS3.0
    catalina.home catalina.base 定义 位子 位置
    gvim 启动 全屏
    Log4j 配置文件(log4j.properties)的所在路径问题(转)
    ie6 div height bug css注意点(转)
    【引用】jdbc.properties 包含多种数据库驱动链接版本
    【引用】ActionContext和ServletActionContext介绍
    【引用】在Eclipse中将java Project转换成Dynamic Web Project
    flex查询xml字段绑定DataGrid小结
    Jquery 每天记一点200972
  • 原文地址:https://www.cnblogs.com/ldxsuanfa/p/10926883.html
  • Copyright © 2011-2022 走看看