zoukankan      html  css  js  c++  java
  • MapReduce并行编程模型和框架

    传统的串行处理方式

    有四组文本数据:

    “the weather is good”,
    “today is good”,
    “good weather is good”,
    “today has good weather”

    对这些文本数据进行词频统计:

    import java.util.Hashtable;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    
    /**
     * 传统的串行计算方式词频统计
     *
     * @version 2017年1月12日  下午4:05:33
     */
    
    public class WordCount {
    
        public static void main(String[] args) {
    
            String[] text = new String[]{
                    "the weather is good","today is good",
                    "good weather is good","today has good weather"
            };
    
            //同步、线程安全
             Hashtable ht = new Hashtable();
             //HashMap ht = new HashMap();
            for(int i=0;i<=3;i++){
                 //字符串根据分隔符解析
                StringTokenizer st  = new StringTokenizer(text[i]);
    
                while (st.hasMoreTokens()) {
    
                    String world = st.nextToken();
    
                    if(!ht.containsKey(world)){
                        ht.put(world, new Integer(1));
                    }else{
                        int wc  = ((Integer)ht.get(world)).intValue()+1;
                        ht.put(world, new Integer(wc));     
                    }
                }//end of while 
            }//end of for
    
            //输出统计结果
            for(Iterator itr = ht.keySet().iterator();itr.hasNext();){
                String world = (String) itr.next();
                System.out.println(world+": " +(Integer)ht.get(world)+ "; ");
    
            }
    
    
        }
    }

    一个MR分布式程序

    求出每个年份的最高气温:

    MaxTemperatureMapper.Java:
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    
    public class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable>
    {
          @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
    
               //解析字段
               String line =value.toString();                 
               try{
    
                   String year  = line.substring(0,4);
                   int airTemperature =Integer.parseInt(line.substring(5));
    
                   context.write(new Text(year),new IntWritable(airTemperature));
    
               }catch(Exception e){
                   System.out.println("error in line:" + line);
               }
        }
    
    }  
    
    MaxTemperatureReducer.java:
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    /**
     *  reducer 比较每年度温度最高值
     * */
    public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    
       @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    
                int MaxValue = Integer.MIN_VALUE;
    
                for(IntWritable value:values){
                    MaxValue = Math.max(MaxValue, value.get());
                }
    
                context.write(key , new IntWritable(MaxValue));
        }
    }
    
    MaxTemperatureDriver.java:
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class MaxTemperatureDriver extends Configured implements Tool{
    
        @Override
        public int run(String[] args) throws Exception {
            // 对 参数进行判断:参数个数不为2,打印错误信息 
            if (args.length != 2){
    
                System.err.printf("Usage: %s <input><output>",getClass().getSimpleName());
    
                ToolRunner.printGenericCommandUsage(System.err);
    
                return -1;                  
    
        }        
    
            Configuration conf =getConf();                
    
            @SuppressWarnings("deprecation") //不检测过期的方法
            Job job = new Job(conf);
    
            job.setJobName("Max Temperature");                  
    
            job.setJarByClass(getClass());
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job,new Path(args[1]));                  
    
            job.setMapperClass(MaxTemperatureMapper.class);
    
            job.setReducerClass(MaxTemperatureReducer.class);                  
    
            job.setOutputKeyClass(Text.class);
    
            job.setOutputValueClass(IntWritable.class);                  
    
            return job.waitForCompletion(true)?0:1; 
        }
    
    
        public static void main(String[] args)throws Exception{
    
            int exitcode = ToolRunner.run(new MaxTemperatureDriver(), args);
    
            System.exit(exitcode);                  
    
        }
    }   

    上传数据至hadoop集群:

    这里写图片描述

    原始数据:
    Temperature1:

    1990 21
    
    1990 18
    
    1991 21
    
    1992 30
    
    1990 21
    

    Temperature2:

    1991 21
    
    1990 18
    
    1991 24
    
    1992 30
    
    1993 21
    

    将程序打包上传至主节点某个目录下,执行

    hadoop jar  /data/jar/maxtemperature.jar   hdfs://192.168.75.128:9000/input  hdfs://192.168.75.128:9000/output/temperature

    执行结果:

    结果数据:

    1990    21
    1991    24
    1992    30
    1993    21
    

    完整的MapReduce编程模型

    Combiner:进行中间结果数据网络传输优化的工作。Combiner程序的执行是在Map节点完成计算之后、输出结果之前。

    Partitioner:将所有主键相同的键值对传输给同一个Reduce节点。分区的过程在Map节点输出后、传入Reduce节点之前完成的。

    下面是针对四组数据的MapReduce完整的并行编程模型:

    “the weather is good”,
    “today is good”,
    “good weather is good”,
    “today has good weather”

    这里写图片描述

    完整的MapReduce编程模型

    (1)用户程序会分成三个部分:Mapper,Reducer,Driver
    (2)Mapper的输入数据是KV对的形式,KV的类型可以设置
    (3)Mapper的输出数据是KV对的形式,KV的类型可以设置
    (4)Mapper中的业务逻辑写在map方法中
    (5)map方法是每进来一个KV对调用一次
    (6)Reducer的输入数据应该对应Mapper的输出数据,也是KV
    (7)Reducer的业务逻辑写在reduce方法中
    (8)reduce方法是对每一个< key,valueList> 调用一次
    (9)用户的Mapper和Reducer都要继承各自的父类
    (10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象。

    Hadoop系统架构和MapReduce执行流程

    为了实现Hadoop系统设计中本地化计算的原则,数据存储节点DataNode与计算节点TaskTracker将合并设置,让每个从节点同时运行作为DataNode和TaskTracker,以此让每个Tasktracker尽量处理存储在本地DataNode上的数据。

    而数据存储主控节点NameNode与作业执行主控节点JobTracker既可以设置在同一个主控节点上,在集群规模较大或者这两个主控节点负载都很高以至于互相影响时,也可以分开设置在两个不同的节点上。

    这里写图片描述

    Hadoop系统的基本组成构架

    MapReduce程序的执行流程:

    MapReduce执行一个用户提交的MapReduce程序的基本过程。

    这里写图片描述

    Hadoop MapReduce 程序执行流程

    1) 首先,用户程序客户端通过作业客户端接口程序JobClient提交一个用户程序。
    2) 然后JobClient向JobTracker提交作业执行请求并获得一个Job ID。
    3) JobClient同时也会将用户程序作业和待处理的数据文件信息准备好并存储在HDFS中。
    4) JobClient正式向JobTracker提交和执行该作业。
    5) JobTracker接受并调度该作业,进行作业的初始化准备工作,根据待处理数据的实际分片情况,调度和分配一定的Map节点来完成作业。
    6) JobTracker 查询作业中的数据分片信息,构建并准备相应的任务。
    7) JobTracker 启动TaskTracker节点开始执行具体的任务。
    8) TaskTracker根据所分配的具体任务,获取相应的作业数据。
    9) TaskTracker节点创建所需要的Java虚拟机,并启动相应的Map任务(或Reduce任务)的执行。
    10) TaskTracker执行完所分配的任务之后,若是Map任务,则把中间结果数据输出到HDFS中;若是Reduce任务,则输出最终结果。
    11) TaskTracker向JobTracker报告所分配的任务的完成。若是Map任务完成并且后续还有Reduce任务,则JobTracker会分配和启动Reduce节点继续处理中间结果并输出最终结果。

    参考学习资料:

    1.HashMap和Hashtable的区别:
    http://www.importnew.com/7010.html
    2.StringTokenizer类的使用方法:
    http://yacole.iteye.com/blog/41512

  • 相关阅读:
    HDU5320 : Fan Li
    BZOJ3069 : [Pa2011]Hard Choice 艰难的选择
    BZOJ4227 : 城市
    BZOJ4216 : Pig
    BZOJ1171 : 大sz的游戏
    BZOJ4182 : Shopping
    BZOJ3482 : [COCI2013]hiperprostor
    BZOJ3919 : [Baltic2014]portals
    BZOJ3711 : [PA2014]Druzyny
    BZOJ1580 : [Usaco2009 Hol]Cattle Bruisers 杀手游戏
  • 原文地址:https://www.cnblogs.com/bigdata1024/p/8387421.html
Copyright © 2011-2022 走看看