zoukankan      html  css  js  c++  java
  • MapReduce执行流程及程序编写

    MapReduce
    一种分布式计算模型,解决海量数据的计算问题,MapReduce将计算过程抽象成两个函数

    Map(映射):对一些独立元素(拆分后的小块)组成的列表的每一个元素进行指定的操作,可以高度并行。

    Reduce(化简):对一个列表的元素进行合并

    input -> map -> reduce -> output
    数据流通格式<kay,value>
    
    eg:
    原始数据				->    map input 				map		map output(reduce input)			shuffle							reduce			reduce output
    example example			->    <0,example example>		->		<example,1> <example,1>				->		<example,list(1,1,1)>		->			<example,3>
    helo wrold example 		->    <16,helo wrold example>	-> 		<hello,1> <wrold,1> <example,1>		->		<hello,list(1)> <...>		->			<hello,1> <wrold,1>
    

    MapReduce底层执行流程

    一.Input

    InputFormat

    读取数据
    转换成<key,value>
    

    FileInputFormat

    TextInputFormat 文本初始化,一行变成一个KY对,用偏移量作为Key、
    

    二.Map

    ModuleMapper类继承Mapper类

    执行map(KEYIN,VALUEIN,KETOUT,VALUEOUT),
    	默认情况下
    	KEYIN:LongWritable
    	KEYVALUE:TEXT
    

    三.shuffle(洗牌)

    map,output<key,value>

    a)先存在内存中
    b)合并combiner[可选] ->  <hadoop,1> <hadoop,1> =>>  <hadoop,2>
    c)spill,溢写到磁盘中,存储成很多小文件,过程如下
    	1.分区Partition(数量跟Reduce数量一致)
    	2.在分区内进行排序sort
    d)合并,Merge	->大文件(Map Task任务运行的机器的本地磁盘中)
    e)排序sort
    f)压缩[可选]
    

    四.reduce

    reduce Task会到Map Task运行的机器上COPY要处理的数据

    a)合并merge
    b)排序
    c)分组Group(相同的key的value放在一起)
    

    ModuleReduceper类继承Reduce类

    执行reduce(KEYIN,VALUEIN,KETOUT,VALUEOUT)
    	map的输出类型就是reduce的输入类型,中间的shuffle只是进行合并分组排序,不会改变数据类型
    

    五.output

    OutputFormat

    写数据
    

    FileOutputFormat

    TextInputFormat 每个KeyValue对输出一行,key和value之间使用分隔符	,默认调用key和value的toString方法
    

    代码如下:

    package com.cenzhongman.mapreduce;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    //继承Configured类,从而继承了该类的getConf();line 81
    //实现Tool方法,实现run方法 line79
    //通过Toolrunner工具类的run方法实现,setConf(),达到conf传递的效果
    public class WordCount extends Configured implements Tool {
    	// 1.Map class
    	public static class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    		private Text mapOutputKey = new Text();
    		private final static IntWritable mapOutputValue = new IntWritable(1);
    
    		@Override
    		public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			// line value
    			String lineValue = value.toString();
    
    			// split
    			// String[] strs = lineValue.split(" ");
    			StringTokenizer stringTokennizer = new StringTokenizer(lineValue);
    
    			// iterator
    			while (stringTokennizer.hasMoreTokens()) {
    				// get word value
    				String wordValue = stringTokennizer.nextToken();
    
    				// set value
    				mapOutputKey.set(wordValue);
    
    				// output
    				context.write(mapOutputKey, mapOutputValue);
    			}
    		}
    		
    		@Override
    		public void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			// nothing
    			// 在执行map之前会执行该函数,可用于JDBC等
    			// Reduce同理,不再重复
    		}
    
    		@Override
    		public void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			// nothing
    			// 在执行map之后会执行该函数,可用于JDBC断开等
    		}
    
    	}
    
    	// 2.Reduce class
    	public static class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    		private IntWritable reduceOutputValue = new IntWritable();
    
    		@Override
    		public void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			// sum tmp
    			int sum = 0;
    			// iterator
    			for (IntWritable value : values) {
    				// total
    				sum += value.get();
    			}
    			// set value
    			reduceOutputValue.set(sum);
    
    			// output
    			context.write(key, reduceOutputValue);
    
    		}
    
    	}
    
    	// 3.driver
    	public int run(String[] args) throws Exception {
    		// 1.get configuration
    		Configuration conf = getConf();
    
    		// 2.create Job
    		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    		// run jar
    		job.setJarByClass(this.getClass());
    
    		// 3.set job
    		// input -> map -> reduce -> output
    		// 3.1 input from type
    		Path inPath = new Path(args[0]);
    		FileInputFormat.addInputPath(job, inPath);
    
    		// 3.2 map
    		job.setMapperClass(WordcountMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		//****************shuffle配置***********************
    		//1)Partition分区
    //		job.setPartitionerClass(cls);
    		//2)sort排序
    //		job.setSortComparatorClass(cls);
    		//combiner[可选]Map中的合并
    //		job.setCombinerClass(cls);
    		//Group分组
    //		job.setGroupingComparatorClass(cls);
    		//压缩设置在配置文件中设置,也可以在conf对象中设置
    		
    		//****************shuffle配置***********************
    
    		// 3.3 reduce
    		job.setReducerClass(WordcountReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 3.4 output
    		Path outPath = new Path(args[1]);
    		FileOutputFormat.setOutputPath(job, outPath);
    
    		// 4 submit job
    		boolean isSuccess = job.waitForCompletion(true);
    
            	//set reduce number[可选,优化方式之一,默认值为1]配置文件mapreduce.job.reduces
            	job.setNumReduceTasks(2);
    
    		return isSuccess ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = new Configuration();
    		//set compress设置压缩方式,可以从官方文件和源码中得到-----可选,优化方式之一
    		conf.set("mapreduce.map.output.compress", "true");
    		conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
    
    		// int status = new WordCount().run(args);
    		int status = ToolRunner.run(conf, new WordCount(), args);
    		System.exit(status);
    	}
    }
  • 相关阅读:
    MySQL 数据库主从复制架构
    程序员的双十一
    MySQL 数据库事务与复制
    十字路口的程序员
    瞬息之间与时间之门
    HDFS 与 GFS 的设计差异
    HDFS 异常处理与恢复
    HDFS Client 设计实现解析
    HDFS DataNode 设计实现解析
    HDFS NameNode 设计实现解析
  • 原文地址:https://www.cnblogs.com/cenzhongman/p/7128140.html
Copyright © 2011-2022 走看看