zoukankan      html  css  js  c++  java
  • Hadoop架构设计、执行原理具体解释

    1、Map-Reduce的逻辑过程

    如果我们须要处理一批有关天气的数据。其格式例如以下:

    • 依照ASCII码存储。每行一条记录
    • 每一行字符从0開始计数,第15个到第18个字符为年
    • 第25个到第29个字符为温度。当中第25位是符号+/-

    0067011990999991950051507+0000+

    0043011990999991950051512+0022+

    0043011990999991950051518-0011+

    0043012650999991949032412+0111+

    0043012650999991949032418+0078+

    0067011990999991937051507+0001+

    0043011990999991937051512-0002+

    0043011990999991945051518+0001+

    0043012650999991945032412+0002+

    0043012650999991945032418+0078+

    如今须要统计出每年的最高温度。

    Map-Reduce主要包含两个步骤:Map和Reduce

    每一步都有key-value对作为输入和输出:

    • map阶段的key-value对的格式是由输入的格式所决定的。假设是默认的TextInputFormat。则每行作为一个记录进程处理。当中key为此行的开头相对于文件的起始位置。value就是此行的字符文本
    • map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相相应

    对于上面的样例,在map过程,输入的key-value对例如以下:

    (0,0067011990999991950051507+0000+)

    (33,0043011990999991950051512+0022+)

    (66,0043011990999991950051518-0011+)

    (99,0043012650999991949032412+0111+)

    (132,0043012650999991949032418+0078+)

    (165,0067011990999991937051507+0001+)

    (198,0043011990999991937051512-0002+)

    (231,0043011990999991945051518+0001+)

    (264,0043012650999991945032412+0002+)

    (297,0043012650999991945032418+0078+)

    在map过程中。通过对每一行字符串的解析,得到年-温度的key-value对作为输出:

    (1950, 0)

    (1950, 22)

    (1950, -11)

    (1949, 111)

    (1949, 78)

    (1937, 1)

    (1937, -2)

    (1945, 1)

    (1945, 2)

    (1945, 78)

    在reduce过程。将map过程中的输出。依照同样的key将value放到同一个列表中作为reduce的输入

    (1950, [0, 22, –11])

    (1949, [111, 78])

    (1937, [1, -2])

    (1945, [1, 2, 78])

    在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:

    (1950, 22)

    (1949, 111)

    (1937, 1)

    (1945, 78)

    其逻辑过程可用例如以下图表示:

    image

    下图大概描写叙述了Map-Reduce的Job执行的基本原理:

    image

     

    以下我们讨论JobConf。其有非常多的项能够进行配置:

    • setInputFormat:设置map的输入格式。默觉得TextInputFormat,key为LongWritable,value为Text
    • setNumMapTasks:设置map任务的个数。此设置通常不起作用,map任务的个数取决于输入的数据所能分成的inputsplit的个数
    • setMapperClass:设置Mapper。默觉得IdentityMapper
    • setMapRunnerClass:设置MapRunner, maptask是由MapRunner执行的。默觉得MapRunnable,其功能为读取inputsplit的一个个record,依次调用Mapper的map函数
    • setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
    • setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
    • setPartitionerClass和setNumReduceTasks:设置Partitioner。默觉得HashPartitioner,其依据key的hash值来决定进入哪个partition,每一个partition被一个reduce task处理,所以partition的个数等于reducetask的个数
    • setReducerClass:设置Reducer,默觉得IdentityReducer
    • setOutputFormat:设置任务的输出格式,默觉得TextOutputFormat
    • FileInputFormat.addInputPath:设置输入文件的路径,能够使一个文件,一个路径,一个通配符。能够被调用多次加入多个路径
    • FileOutputFormat.setOutputPath:设置输出文件的路径,在job执行前此路径不应该存在

    当然不用全部的都设置。由上面的样例。能够编写Map-Reduce程序例如以下:

    public class MaxTemperature {

        publicstatic void main(String[] args) throws IOException {

           if (args.length != 2) {

               System.err.println("Usage: MaxTemperature <inputpath> <outputpath>");

               System.exit(-1);

           }

           JobConf conf = new JobConf(MaxTemperature.class);

           conf.setJobName("Max temperature");

           FileInputFormat.addInputPath(conf, new Path(args[0]));

           FileOutputFormat.setOutputPath(conf, new Path(args[1]));

           conf.setMapperClass(MaxTemperatureMapper.class);

           conf.setReducerClass(MaxTemperatureReducer.class);

           conf.setOutputKeyClass(Text.class);

           conf.setOutputValueClass(IntWritable.class);

           JobClient.runJob(conf);

        }

    }

    3、Map-Reduce数据流(data flow)

    Map-Reduce的处理过程主要涉及下面四个部分:

    • clientClient:用于提交Map-reduce任务job
    • JobTracker:协调整个job的执行。其为一个Java进程,其main class为JobTracker
    • TaskTracker:执行此job的task,处理input split,其为一个Java进程,其mainclass为TaskTracker
    • HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件

    image

    3.1、任务提交

    JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。

    • 向JobTracker请求一个新的job ID
    • 检測此job的output配置
    • 计算此job的input splits
    • 将Job执行所需的资源复制到JobTracker的文件系统中的目录中,包含jobjar文件。job.xml配置文件,input splits
    • 通知JobTracker此Job已经能够执行了

    提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务执行完成。

     

    3.2、任务初始化

     

    当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。

    初始化首先创建一个对象来封装job执行的tasks, status以及progress。

    在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。

    其为每一个input split创建一个map task。

    每一个task被分配一个ID。

     

    3.3、任务分配

     

    TaskTracker周期性的向JobTracker发送heartbeat。

    在heartbeat中。TaskTracker告知JobTracker其已经准备执行一个新的task。JobTracker将分配给其一个task。

    在JobTracker为TaskTracker选择一个task之前。JobTracker必须首先依照优先级选择一个Job,在最高优先级的Job中选择一个task。

    TaskTracker有固定数量的位置来执行map task或者reduce task。

    默认的调度器对待map task优先于reduce task

    当选择reduce task的时候。JobTracker并不在多个task之间进行选择,而是直接取下一个,由于reducetask没有数据本地化的概念。

     

    3.4、任务运行

     

    TaskTracker被分配了一个task,以下便要执行此task。

    首先。TaskTracker将此job的jar从共享文件系统中复制到TaskTracker的文件系统中。

    TaskTracker从distributed cache中将job执行所须要的文件复制到本地磁盘。

    其次,其为每一个task创建一个本地的工作文件夹。将jar解压缩到文件文件夹中。

    其三,其创建一个TaskRunner来执行task。

    TaskRunner创建一个新的JVM来执行task。

    被创建的child JVM和TaskTracker通信来报告执行进度。

     

    3.4.1、Map的过程

    MapRunnable从inputsplit中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。

    map的输出并非直接写入硬盘,而是将其写入缓存memory buffer。

    当buffer中数据的到达一定的大小。一个背景线程将数据開始写入硬盘。

    在写入硬盘之前,内存中的数据通过partitioner分成多个partition。

    在同一个partition中,背景线程会将数据依照key在内存中排序。

    每次从内存向硬盘flush数据。都生成一个新的spill文件。

    当此task结束之前。全部的spill文件被合并为一个整的被partition的并且排好序的文件。

    reducer能够通过http协议请求map的输出文件,tracker.http.threads能够设置http服务线程数。

    3.4.2、Reduce的过程

    当map task结束后。其通知TaskTracker。TaskTracker通知JobTracker。

    对于一个job,JobTracker知道TaskTracer和map输出的相应关系。

    reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了全部的map输出。

    reduce task须要其相应的partition的全部的map输出。

    reduce task中的copy过程即当每一个map task结束的时候就開始拷贝输出。由于不同的maptask完毕时间不同。

    reduce task中有多个copy线程,能够并行拷贝map输出。

    当非常多map输出复制到reduce task后。一个背景线程将其合并为一个大的排好序的文件。

    当全部的map输出都复制到reduce task后,进入sort过程,将全部的map输出合并为大的排好序的文件。

    最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每一个key。最后的结果写入HDFS。

     

    image

     

    3.5、任务结束

     

    当JobTracker获得最后一个task的执行成功的报告后,将job得状态改为成功。

    当JobClient从JobTracker轮询的时候。发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。


    如有不懂,欢迎拨打10010或10086。转何哲江。

  • 相关阅读:
    <BackTracking> dfs: 39 40
    <Tree> 110 124
    <Tree.PreOrder> DFS 113, 129
    <Math> 50 367
    <String> 49 87
    Haproxy配置Rabbitmq集群负载均衡
    Rabbitmq镜像集群的搭建
    rabbitmq常用命令
    Linux安装rabbitmq
    Docker自定义网络
  • 原文地址:https://www.cnblogs.com/cxchanpin/p/6876533.html
Copyright © 2011-2022 走看看