zoukankan      html  css  js  c++  java
  • MapReduce编程小结

      (1)key-value到map端比较容易,每个分片都会交由一个MapTask,而每个分片由InputFormat(一般是FileInputFormat)决定(一般是64M),

          每个MapTask会调用N次map函数,具体是多少次map函数呢?

          由job.setInputFormatClass(?)中?决定,默认是TextInputFormat.class,TextInputFormat是以一行为解析对象,一行对应一个map函数的调用。

      (2)key-value在reduce端比较复杂,第二参数是Iterable<?>对象,涉及<key,list{value1,value2...}>,它对应一次reduce函数的调用,

          也就是说,一次reduce函数调用将会处理一个key,多个value,

      (3)而这个<key,list{value1,value2...}>输入是如何来的呢?

        mapreduce框架自带了预定义key(Text、LongWritable等)的排序,

        将来自不同MapTask的相同的key加以聚合,变为<key,list{value1,value2...}>作为reduce函数的输入。

      (4)说了MapTask个数有分片决定,那ReduceTask将由什么决定呢?

         每个map函数执行后都会调用一次getPartition函数(默认是HashPartitioner类的)来获取分区号,最终写入磁盘文件带有分区号这条尾巴,以便reduce端的拉取,

         而getPartition函数中最重要的参数numReduceTasks将由job.setNumReduceTasks决定,默认值为1,

         故若不设置此参数很多情况下getPartition函数会返回0,也就对应一个ReduceTask。

      (5)说完了分区,再来说分组。分区是在map端确定,相对于每个map函数,而分组却放到了reduce端,相对于多个MapTask,组属于区。

        分组会影响什么呢?

      (6)当map端的输出key是自定义NewK2时,且自定义了compareTo,使用分组后,

           将使用分组类MyGroupingComparator的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)进行sort,

         得到<key,list{value1,value2...}>。

      附上一个例子:

      

    package examples; 
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    public class GroupApp {
    	static final String INPUT_PATH = "hdfs://192.168.2.100:9000/hello";
    	static final String OUTPUT_PATH = "hdfs://192.168.2.100:9000/out";
    	
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
    		final Path outPath = new Path(OUTPUT_PATH);
    		if(fileSystem.exists(outPath)) {
    			fileSystem.delete(outPath, true);
    		}
    		
    		final Job job = new Job(conf, GroupApp.class.getSimpleName());
    		job.setJarByClass(GroupApp.class);
    		FileInputFormat.setInputPaths(job, INPUT_PATH);
    		
    		job.setInputFormatClass(TextInputFormat.class);
    		
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(NewK2.class);
    		job.setMapOutputValueClass(LongWritable.class);
    		
    		job.setPartitionerClass(MyPartitoner.class);
    		job.setNumReduceTasks(3);
    		
    		job.setGroupingComparatorClass(MyGroupingComparator.class);
    		
    		job.setReducerClass(MyReducer.class);
    		
    		job.setOutputKeyClass(LongWritable.class);
    		job.setOutputValueClass(LongWritable.class);
    		
    		FileOutputFormat.setOutputPath(job, outPath);
    		
    		job.waitForCompletion(true);
    	}
    	
    	static class MyPartitoner extends HashPartitioner<NewK2, LongWritable> {
    		  public int getPartition(NewK2 key, LongWritable value, int numReduceTasks) {
    			  System.out.println("the getPartition() is called...");
    			  if(key.first == 1) {
    				  return 0 % numReduceTasks;
    			  }
    			  else if(key.first == 2) {
    				  return 1 % numReduceTasks;
    			  }
    			  else {
    				  return 2 % numReduceTasks;
    			  }
    		  }
    	}
    	
    	static class NewK2 implements WritableComparable<NewK2> {
    		Long first = 0L;
    		Long second = 0L;
    		
    		public NewK2(){}
    		
    		public NewK2(long first, long second) {
    			this.first = first;
    			this.second = second;
    		}
    
    		public void write(DataOutput out) throws IOException {
    			out.writeLong(first);
    			out.writeLong(second);
    		}
    
    		public void readFields(DataInput in) throws IOException {
    			first = in.readLong();
    			second = in.readLong();
    		}
    
    		public int compareTo(NewK2 o) {
    			System.out.println("the compareTo() is called...");
    			
    			final long minus = this.first - o.first;
    			if(minus != 0) {
    				return (int)minus;
    			}
    			return (int) (this.second - o.second);
    		}
    	}
    	
    	static class MyGroupingComparator implements RawComparator<NewK2> {
    		public int compare(NewK2 o1, NewK2 o2) {
    	//		System.out.println("the compare() is called...");
    			return (int) (o1.first - o2.first);
    		}
    
    		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    			System.out.println("the compare() is called...");
    			return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
    		}
    	}
    	
    	static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable> {
    		protected void map(LongWritable k1, Text v1, Context ctx) throws IOException, InterruptedException {
    			final String[] splited = v1.toString().split("	");
    			
    			System.out.println("the map() is called...");
    			
    			NewK2 k2 = new NewK2(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
    			LongWritable v2 = new LongWritable(Long.parseLong((splited[1])));
    			ctx.write(k2, v2);
    //			System.out.println("the real map output...");
    //			System.out.println("<"+k2.first+","+v2+">");
    		}
    	}
    	static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable> {
    		long v3 = 0;
    		protected void reduce(NewK2 k2, Iterable<LongWritable> v2s, Context ctx) throws IOException, InterruptedException {
    			System.out.println("the reduce() is called...");
    			for(LongWritable secend : v2s) {
    				v3 = secend.get();
    				System.out.println("<"+k2.first+","+k2.second+">, "+v3+"");
    			}
    			System.out.println("--------------------------------------------");
    			System.out.println("the real reduce output...");
    			System.out.println("<"+k2.first+","+v3+">");
    			ctx.write(new LongWritable(k2.first), new LongWritable(v3));
    			System.out.println("--------------------------------------------");
    		}
    	}
    }
    

      

    我喜欢一无所有,这样就只能一步一步的创造世界...
  • 相关阅读:
    用Iterator实现遍历集合
    SimpleDateFormat使用详解 <转>
    Java学习之Iterator(迭代器)的一般用法 (转)
    Java:String和Date、Timestamp之间的转换
    关于PreparedStatement.addBatch()方法 (转)
    JavaBean入门及简单的例子
    Tomcat7.0无法启动解决方法[failed to start]
    executeQuery、executeUpdate 和 execute
    jquery中attr和prop的区别
    Jquery的parent和parents(找到某一特定的祖先元素)
  • 原文地址:https://www.cnblogs.com/riordon/p/4044456.html
Copyright © 2011-2022 走看看