zoukankan      html  css  js  c++  java
  • NameNode、DataNode和MapReduce运行原理

    一、Namenode
    1.作用
    ①负责元数据的存储
    ②负责接受和处理客户端的请求
    ③负责接受DN上报的信息
    ④和DN保持心跳,向DN下达命令

    2.元数据
    包含两部分
    ①文件的属性(保存在edits+fsimage)
    ②块的位置信息(由DN启动后自动上报,动态生成)

    3.存储元数据的文件
    ①edits文件: NN启动后,客户端每次的写操作都会记录在edits文件中
    ②fsimage文件:
    在NN第一次格式化时生成,在NN每次执行checkpoint,满足条件后,会重新将内存中合并后的元数据
    持久化到fsimage文件中

    4.checkpoint
    每次Namenode会定期进行checkpoint,主要了为了防止在运行期间产生大量的edits文件,导致下次重启时
    恢复时间过长!

    定期将edits文件中新的内容,持久化到fsimage文件中,进行快照存储!

    默认的机制:
    ①没间隔1h,执行一次
    ②距离上次,又新产生了100w次txns操作

    5.NN的安全模式
    NN的安全模式主要是为了接受DN上报块信息!

    每次NN启动时,会自动进入安全模式!在安全模式只能有限读,不能写!

    当DN上报的块的最小副本数的总数 / 块的总数 > 0.999时,NN会在30秒后自动离开!

    手动操作: hdfs dfsadmin -safemode get|enter|leave|wait

    二、SecondaryNamenode
    如果配置了SecondaryNamenode,2nn会帮助NN进行checkpoint操作!

    三、Datanode
    1.作用
    ①接受客户端的读写块请求
    ②DN负责维护块的完整性,通过定期检查块的校验和判断块是否损坏
    损坏的块,DN会自动删除,在下次启动时,不会上报给NN
    ③DN负责定期向NN汇报块的信息,接收NN的其他任务(复制块等)

    2.Datanode的掉线时长
    DN和NN每间隔dfs.heartbeat.interval(3s)进行一次心跳!
    如果DN和NN上一次心跳举例当前时间,
    已经过了2*dfs.namenode.heartbeat.recheck-interval(5min)+10*dfs.heartbeat.interval,
    NN会将DN的状态标记为DEAD!

    四、其他配置
    1.NN的多目录配置
    NN的多目录指对元数据进行多个目录的同时备份,通过hdfs-site.xml中的dfs.namenode.name.dir进行设置!

    2.DN的多目录配置
    如果机器添加了新的磁盘,希望DN在写入块时,向新磁盘的目录进行写入!
    配置DN的多目录!
    通过hdfs-site.xml中dfs.datanode.data.dir进行配置

    3.服役新节点
    ①准备机器,安装软件,配置NN,RM的相关配置
    ②启动datanode和nodemanager进程即可


    服役了新的DN节点后,可以执行再平衡的命令,这个命令可以将集群中块进行重新平衡分配!
    ./start-balancer.sh

    4.白名单
    白名单是为了阻止某个进程加入集群!
    白名单之外的机器,无法进入集群!
    白名单通过hdfs-site.xml中的dfs.hosts配置!
    可以使用 hdfs dfsadmin -refreshNodes刷新配置,读取此配置信息!

    5.黑名单
    退役datanode!
    黑名单通过hdfs-site.xml中的dfs.hosts.exclude配置!
    黑名单中的机器在最后一次启动时,会将当前机器的块移动到其他节点!
    注意: 如果当前集群中在线的DN节点不满足某些文件的副本数要求,当前退役节点是无法退役完成!

    6.集群间的拷贝
    hadoop distcp hdfs://xxxx:xxx/xxx hdfs://xxxx:xxx/xxx

    7.在线归档
    归档: hadoop arichieve -archievename 归档文件名 -p 父目录 输入文件... 输出目录
    使用: hadoop fs -ls har:///归档文件名

    在线归档不会删除原文件!

    五、MapReduce

    二、MR的核心编程思想

    1.概念
    Job(作业) : 一个MR程序称为一个Job
    MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。
    负责Job中执行状态的监控,容错,和RM申请资源,提交Task等!

    Task(任务): Task是一个进程!负责某项计算!

    Map(Map阶段): Map是MapReduce程序运行的第一个阶段!
    Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分!
    切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算!

    Task负责是Map阶段程序的计算,称为MapTask!

    在一个MR程序的Map阶段,会启动N(取决于切片数)个MapTask。每个MapTask是并行运行!

    Reduce(Reduce阶段): Reduce是MapReduce程序运行的第二个阶段(最后一个阶段)!
    Reduce阶段的目的是将Map阶段,每个MapTask计算后的结果进行合并汇总!得到最终结果!
    Reduce阶段是可选的!

    Task负责是Reduce阶段程序的计算,称为ReduceTask!
    一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行!
    每个ReduceTask最终都会产生一个结果!

    2.MapReduce中常用的组件
    ①Mapper: map阶段核心的处理逻辑
    ②Reducer: reduce阶段核心的处理逻辑
    ③InputFormat: 输入格式
    MR程序必须指定一个输入目录,一个输出目录!
    InputFormat代表输入目录中文件的格式!
    如果是普通文件,可以使用FileInputFormat.
    如果是SequeceFile(hadoop提供的一种文件格式),可以使用SequnceFileInputFormat.
    如果处理的数据在数据库中,需要使用DBInputFormat

    ④RecordReader: 记录读取器
    RecordReader负责从输入格式中,读取数据,读取后封装为一组记录(k-v)!

    ⑤OutPutFormat: 输出格式
    OutPutFormat代表MR处理后的结果,要以什么样的文件格式写出!
    将结果写出到一个普通文件中,可以使用FileOutputFormat!
    将结果写出到数据库中,可以使用DBOutPutFormat!
    将结果写出到SequeceFile中,可以使用SequnceFileOutputFormat
    ⑥RecordWriter: 记录写出器
    RecordWriter将处理的结果以什么样的格式,写出到输出文件中!

    在MR中数据的流程:
    ①InputFormat调用RecordReader,从输入目录的文件中,读取一组数据,封装为keyin-valuein对象
    ②将封装好的key-value,交给Mapper.map()------>将处理的结果写出 keyout-valueout
    ③ReduceTask启动Reducer,使用Reducer.reduce()处理Mapper写出的keyout-valueout,
    ④OutPutFormat调用RecordWriter,将Reducer处理后的keyout-valueout写出到文件

    ⑦Partitioner: 分区器
    分区器,负责在Mapper将数据写出时,将keyout-valueout,为每组keyout-valueout打上标记,进行分区!
    目的: 一个ReduceTask只会处理一个分区的数据!

    六、MapReduce的运行流程概述
    需求: 统计/hello目录中每个文件的单词数量,
    a-p开头的单词放入到一个结果文件中,
    q-z开头的单词放入到一个结果文件中。

    例如: /hello/a.txt 200M
    hello,hi,hadoop
    hive,hadoop,hive,
    zoo,spark,wow
    zoo,spark,wow
    ...
    /hello/b.txt 100m
    hello,hi,hadoop
    zoo,spark,wow
    ...

    1.Map阶段(运行MapTask,将一个大的任务切分为若干小任务,处理输出阶段性的结果)
    ①切片(切分数据)
    /hello/a.txt 200M
    /hello/b.txt 100m

    默认的切分策略是以文件为单位,以文件的块大小(128M)为片大小进行切片!
    split0:/hello/a.txt,0-128M
    split1: /hello/a.txt,128M-200M
    split2: /hello/b.txt,0M-100M

    ②运行MapTask(进程),每个MapTask负责一片数据
    split0:/hello/a.txt,0-128M--------MapTask1
    split1: /hello/a.txt,128M-200M--------MapTask2
    split2: /hello/b.txt,0M-100M--------MapTask3

    ③读取数据阶段
    在MR中,所有的数据必须封装为key-value
    MapTask1,2,3都会初始化一个InputFormat(默认TextInputFormat),每个InputFormat对象负责创建一个RecordReader(LineRecordReader)对象,
    RecordReader负责从每个切片的数据中读取数据,封装为key-value.

    LineRecordReader: 将文件中的每一行封装为一个key(offset)-value(当前行的内容)
    举例:
    hello,hi,hadoop----->(0,hello,hi,hadoop)
    hive,hadoop,hive----->(20,hive,hadoop,hive)
    zoo,spark,wow----->(30,zoo,spark,wow)
    zoo,spark,wow----->(40,zoo,spark,wow)

    ④进入Mapper的map()阶段
    map()是Map阶段的核心处理逻辑! 单词统计! map()会循环调用,对输入的每个Key-value都进行处理!
    输入:(0,hello,hi,hadoop)
    输出:(hello,1),(hi,1),(hadoop,1)

    输入:(20,hive,hadoop,hive)
    输出:(hive,1),(hadoop,1),(hive,1)

    输入:(30,zoo,spark,wow)
    输出:(zoo,1),(spark,1),(wow,1)

    输入:(40,zoo,spark,wow)
    输出:(zoo,1),(spark,1),(wow,1)

    ⑤目前,我们需要启动两个ReduceTask,生成两个结果文件,需要将MapTask输出的记录进行分区(分组,分类)
    在Mapper输出后,调用Partitioner,对Mapper输出的key-value进行分区,分区后也会排序(默认字典顺序排序)
    分区规则: a-p开头的单词放入到一个区
    q-z开头的单词放入到另一个区
    MapTask1:
    0号区: (hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)
    1号区: (spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)

    MapTask2:
    0号区: 。。。
    1号区: ...


    MapTask3:
    0号区: (hadoop,1),(hello,1),(hi,1),
    1号区: (spark,1),(wow,1),(zoo,1)

    2.Reduce阶段
    ①copy
    ReduceTask启动后,会启动shuffle线程,从MapTask中拷贝相应分区的数据!

    ReduceTask1: 只负责0号区
    将三个MapTask,生成的0号区数据全部拷贝到ReduceTask所在的机器!
    (hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)
    (hadoop,1),(hello,1),(hi,1),


    ReduceTask2: 只负责1号区
    将三个MapTask,生成的1号区数据全部拷贝到ReduceTask所在的机器!
    (spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)
    (spark,1),(wow,1),(zoo,1)
    ②sort

    ReduceTask1: 只负责0号区进行排序:
    (hadoop,1),(hadoop,1),(hadoop,1),(hello,1),(hello,1),(hi,1),(hi,1),(hive,1),(hive,1)
    ReduceTask2: 只负责1号区进行排序:
    (spark,1),(spark,1),(spark,1),(wow,1) ,(wow,1),(wow,1),(zoo,1),(zoo,1)(zoo,1)

    ③reduce
    ReduceTask1---->Reducer----->reduce(一次读入一组数据)

    何为一组数据: key相同的为一组数据
    输入: (hadoop,1),(hadoop,1),(hadoop,1)
    输出: (hadoop,3)

    输入: (hello,1),(hello,1)
    输出: (hello,2)

    输入: (hi,1),(hi,1)
    输出: (hi,2)

    输入:(hive,1),(hive,1)
    输出: (hive,2)

    ReduceTask2---->Reducer----->reduce(一次读入一组数据)

    输入: (spark,1),(spark,1),(spark,1)
    输出: (spark,3)

    输入: (wow,1) ,(wow,1),(wow,1)
    输出: (wow,3)

    输入:(zoo,1),(zoo,1)(zoo,1)
    输出: (zoo,3)

    ④调用OutPutFormat中的RecordWriter将Reducer输出的记录写出
    ReduceTask1---->OutPutFormat(默认TextOutPutFormat)------>RecordWriter(LineRecoreWriter)
    LineRecoreWriter将一个key-value以一行写出,key和alue之间使用 分割
    在输出目录中,生成文件part-r-0000
    hadoop 3
    hello 2
    hi 2
    hive 2

    ReduceTask2---->OutPutFormat(默认TextOutPutFormat)------>RecordWriter(LineRecoreWriter)
    LineRecoreWriter将一个key-value以一行写出,key和alue之间使用 分割
    在输出目录中,生成文件part-r-0001
    spark 3
    wow 3
    zoo 3


    2.运行流程
    Map------------------------Reduce阶段
    split(切片)----read(读取数据,封装为输入的k-v)---map(Mapper.map())----sort(分区和排序)-----------------copy(拷贝分区数据)-------sort(合并且排序)-----reduce(合并)------write(写出数据)

     

     

     

  • 相关阅读:
    待续中的图灵机与有限状态机
    barabasilab-networkScience学习笔记6-evolving networks
    pythonyCool-moviepy
    各种方法合成地震图的对应单位
    matla互相关协方差的计算和理解
    matlab运行中出现“Caught "std::exception" Exception message is: Message Catalog MATLAB:builtins was not loaded from the file."
    对拉普拉斯平滑 的认识
    关于小波变换
    随笔学习
    关于滤波频段的选取(近震波形)
  • 原文地址:https://www.cnblogs.com/20183544-wangzhengshuai/p/13757373.html
Copyright © 2011-2022 走看看