Hadoop整体:
简介:
Apache开发、基于Java、核心是HDFS和MapReduce
特性:高可靠(容错)性(冗余存储)、高效性(分布式存储和处理)、高可扩展性(成本低)(集群)、支持多种语言
应用架构:
重要子项目:
HDFS:文件系统,超大数据、流式处理能力。 在部分硬件故障下仍能保证可用可靠。 高吞吐率,作为底层数据存储
HBase:分布式列式数据库。 实时读写、可伸缩。 采用HDFS作为底层。 针对谷歌的BigTable开源实现。 非结构化数据存储。
与传统关系数据库区别:基于列式(HBase)和基于行式(传统)
MapReduce:一种编程模型。 并行运算。 抽象成两个函数。 屏蔽底层开发并行程序。 分治思想:切分、并行完成、整合
Hive:数据仓库。 对数据集进行数据整理、特殊查询、分析存储。 查询语言HiveQL,实现MR统计。
Hadoop集群中的节点类型:
NameNode:协调集群中的数据存储
DataNode:存储被拆分的数据块
JobTracker:协调计算任务
TaskTracker:执行由JobTracker指派的任务
SecondaryNameNode:帮助NameNode收集文件系统的运行状态信息
HDFS:
分布式文件系统:
一般采用“客户机/服务器”模式。
集群架构:节点存放在机架(Rack)上(8~64个),不同节点网络互连,不同机架通过交换机互连
HDFS默认的块大小为64MB,若文件小于一个数据块,不占用整块存储空间。
主节点(Master Node):也被称为名称节点(NameNode),负责文件和目录的创建、删除、重命名,管理数据节点与文件块的映射关系。客户端访问名称节点得到文件块位置。
从节点(Slave Node):也称为数据节点(DataNode),负责数据的存储和读取,根据名称节点命令执行。
多副本存储:存储在不同节点上,同一文件块的不同副本对应的节点,分布在不同机架上,防止故障丢失。
设计需求:透明性、并发控制、文件备份、系统异构性、可伸缩性、容错、安全。
HDFS简介
特点:
- 兼容廉价设备。有检测硬件故障和自动恢复机制。
- 流数据读写。满足批量处理,提高吞吐率。
- 大数据集。大容量文件。
- 简单文件模型。“一次写入,多次读取”。一旦写入完成就无法再写入。
- 跨平台兼容。Java实现,只要有JVM就可以运行。
局限性:
- 不适合低延迟数据访问。流式数据处理提高吞吐率,但延迟较高。HBase更适合低延迟。
- 无法高效存储大量小文件。检索效率低、线程管理开销大。
- 不支持多用户写入或修改。只能有一个写入者,之后只能对文件追加。
块:
64MB,增大块的大小来最小化寻址开销(包括磁盘的寻道开销和数据块定位开销)
设置太大也不行,会降低Map任务并行作业的速度
采用块概念的好处:
- 支持大规模文件存储。分块存储,文件大小就不受节点限制
- 简化系统设计。文件块大小固定
- 适合数据备份。文件块冗余存储在不同节点上
节点:
名称节点:管理命名空间(Namespace),两个核心数据结构(FsImage、EditLog)
FSImage:维护系统文件结构树、所有的文件和文件夹的元数据
EditLog:记录文件的创建、删除操作
名称节点并不持续存储数据块的位置信息,而是每次系统启动时,扫描节点重构信息
数据节点:HDFS的工作节点,根据客户端和名称节点的调度来执行操作,并向名称节点发送自己存储的块列表
体系结构:
主从(Master/Slave)模型
一个集群包括一个名称节点(中心服务器,与客户端通信)和若干个数据节点(处理请求)
每个数据节点周期性向名称节点发送“心跳”,没发送的将被标记,不再分配任务
客户端访问文件:
- 客户端发送文件名给名称节点
- 名称节点根据文件名找到对应的数据块
- 根据数据块信息找到实际存储的数据节点位置
- 把位置发送给客户端,客户端直接访问数据节点
名称节点不传输数据。实现了在不同数据节点的并发访问
通信协议:构建在TCP/IP之上
客户端与名称节点:TCP连接,然后使用客户端协议
名称节点与数据节点:数据节点协议
客户端与数据节点:RPC(Remote Procedure Call)
HDFS体系结构局限性:
只有一个名称节点带来的问题:
- 命名空间局限。名称节点保存在内存中
- 性能。系统吞吐量受限于名称节点的吞吐量
- 隔离问题。无法对不同程序进行隔离
- 集群的整体。一旦名称节点坏了就整个集群不能用
存储原理:
多副本方式冗余存储
一个数据块的多个副本会分布到不同数据节点,优点:
- 加快传输数据速度。客户端分别在不同副本读取数据
- 容易检查错误。
- 保证可靠性。
存取策略:
- 存放:采用以机架(Rack)为基础的数据存放。不同机架通过交换机通讯。每个数据节点放在不同机架上。写入不能充分利用同一机架的带宽,但读取时提高了速度和可靠性
- 读取:客户端可以调用API获取自己所属机架的ID。若发现数据块对应的机架ID与客户端的机架ID一样,则优先选择。
- 复制:流水线复制。文件先写入本地,然后分块,由名称节点产生数据节点列表返回客户端,客户端把数据和列表写入第一个数据节点,接收到部分数据后,第一个数据节点就往第二个数据节点传递...所以文件传完时也复制完了
数据错误与恢复:
名称节点出错:
核心文件同步复制到SecondaryNameNode,出错时由这里恢复,其本身不会处理请求
数据节点出错:
名称节点没收到心跳信息时,把该数据节点标记为不可读。当由于出现某些数据节点不可用,副本数量小于冗余因子,就会启动冗余复制
数据出错:
客户端采用md5和sha1对数据块校验。客户端根据信息文件校对,若出错后会请求另一数据节点读取该文件块,并向名称节点报告,名称节点定期处理
数据读写过程:
FileSystem—抽象类,DistributedFileSystem是其在HDFS中的实现,输入流对应open()方法,生成DFSInputStream对象实例化FSDataInputStream,输出流对应create()方法,生成DFSOutputStream对象实例化FSDataOutputStream
读取过程:
- open()打开文件,创建输入流对象FSDI
- DFSInputStream的构造函数中通过ClientProtocal.getBlockLocations()远程调用名称节点,获取数据块的位置(名称节点根据客户端远近排序数据节点),实例化输入流DFSI
- 调用read()读取数据,选择最近数据节点建立连接
- 读取完毕后关闭和该节点连接(读取错误的话会与同一数据块的下一数据节点连接)
- 通过getBlockLocations()查找下一个数据块
- 找到最佳数据节点…与2~5一样
- 读取完毕后,调用close(),关闭输入流
写数据过程:(不发生异常情况下)
- 调用create()创建输出流对象FSDO
- DFS对象通过RPC远程调用名称节点,在命名空间中创建一个新文件。名称节点执行是否存在文件、有否权限等检查,然后构造文件,并加入文件信息,就可以实例化输出流DFSO
- 调用write()方法写入数据
- 写入的数据分成包,被放入DFSO对象的内部队列。向名称节点申请数据节点保存文件及其副本,数据节点形成数据流管道,执行“流水线策略”
- 接收到数据的数据节点逆向发回“确认包”(ACK),收到应答后,对应完成接收的分包从内部队列移除,并重复3~5步
- 调用close()关闭输出流,内部队列的分包都应答完后,使用ClientProtocol.complete()通知名称节点关闭
HDFS编程:
HDFS的shell命令格式:
hadoop command 【genericOptions】 【commandOptions】
fs是HDFS最常用的command
fs后接:
-ls -cat -chgrp -chown -chmod -tail -stat -touchz -mkdir -copyFromLocal -copyToLocal -cp …
MapReduce
概述:
解决大规模数据高效处理 问题,分布式并行编程大幅度提高性能,实现高效批处理
MapReduce是并行编程模型,将并行计算过程高度抽象为Map和Reduce
产生背景:
由于CPU的性能提升变慢,需要用分布式编程提高计算性能
分布式并行编程运行在大规模集群上,且具有扩展性
Google最早提出MapReduce,Hadoop MapReduce是开源实现
传统并行计算框架
|
MapReduce
|
|
集群架构/容错性
|
共享式(共享内存/共享存储),容错性差
|
非共享式,容错性好
|
硬件/价格/扩展性
|
刀片服务器、高速网、SAN,价格贵,扩展性差
|
普通PC机,便宜,扩展性好
|
编程/学习难度
|
what-how,难
|
what,简单
|
适用场景
|
实时、细粒度计算、计算密集型
|
批处理、非实时、数据密集型
|
MapReduce模型简介:
并行计算抽象成Map和Reduce
“分而治之”,大规模数据集切分成独立的小数据块,每个小数据集被多个Map任务并行处理(数据和Map任务放在同一节点)
“计算向数据靠拢”,因为移动数据需要大量传输开销,所以把计算节点和存储节点放在一起
Master/Slave框架,Master上运行JobTracker,Slave上运行TaskTracker,前者负责任务调度和监控,后者负责执行任务
Hadoop由Java实现,但MapReduce程序不一定写Java语言
Map和Reduce函数
编程只需关注这两个函数的实现,不需要考虑底层的处理
Map函数将输入元素转换成键值对,这里的“键”不唯一,有可能对应多个value
Reduce函数将输入的一系列具有相同键的值组合起来,输出合并成一个文件
MapReduce的体系结构
由Client、Task、JobTracker、TaskTracker组成
Client:
用户编写的MapReduce程序通过Client提交到JobTracker端
通过Client提供的接口查看工作状态
JobTracker:
负责资源监控和工作调度
发现失败进行任务转移
将执行进度、资源信息发给TaskTracker,让它进行资源分配
TaskTracker:
“心跳”周期上传本节点的资源使用和任务执行情况给JobTracker,并执行JobTracker的命令
TaskTracker使用“slot”等量划分本节点的资源,Task要拿到slot后才能执行,分为Map Slot和Reduce Slot
Task:
分为Map和Reduce Task,由TaskTracker启动
工作流程的要点:
- 不同Map任务或者Reduce任务之间不会通信
- 用户不能显式从一台机器向另一台机器发送消息,而都是MapReduce框架自己去交换数据
- MapReduce执行过程中,Map任务的输入,Reduce的输出是保存在HDFS中,而Map处理的中间结果则保存在本地存储中(磁盘)
执行过程:
- InputFormat模块做预处理,然后切分为多个逻辑上的InputSplit,它是处理和运算的单位,只是一个逻辑而没有实际切割,记录了数据的位置和长度等元数据(区别于HDFS中每个节点上以固定大小的block为单位存储)
- 通过RecordReader根据InputSplit的信息加载数据并转换为适合Map任务的键值对
- 为每个Split创建一个Map任务,Split的多少决定了Map的数量,理想情况下是分片大小等于HDFS的块。Map任务根据用户定义的规则,输出一系列<key, value>作为中间结果
- 对Map的输出进行分区、排序、合并、归并等,得到<key, value-list>的形式,这个过程称为Shuffle
- 以一系列<key, value-list>作为输入,reduce执行用户定义的逻辑,输出到OutputFormat,最优的reduce任务个数取决于reduce slot,一般要略小于以预留给错误处理
- OutputFormat模块验证输出是否符合正确,存储到分布式系统中
Shuffle详细过程:
Shuffle就是对Map的输出结果进行分区、排序、合并等处理后提交给Reduce,分为在Map端和Reduce端的操作
Map端:输出结果被写入缓存,当缓存满了就进行溢写操作,把缓存的数据写入磁盘并清空内存。在这过程中要先把缓存的数据分区,对每个分区的数据进行排序、合并再写入磁盘。每次溢写操作生成一个磁盘文件,Map任务结束后这些文件被归并成一个大文件,通知Reduce任务来领取自己的数据
具体而言有:
- 每一个Map任务分配一个缓存,写入缓存之前,key和value都会被序列化为字节数组
- MapReduce的缓存容量默认为100MB,必须启动spill(溢写)操作,为了保证Map结果能够持续写入缓存,不能全部占满才启动溢写,默认溢写比例为0.8
- 在溢写之前,缓存数据会先分区(partition),MapReduce通过partitioner接口对键值分区,分区方式默认为Hash函数对key进行哈希后再用Reduce任务的数量进行取模,Map输出结果均匀分配给Reduce任务。用户可通过重载partitioner接口来改变分区方式
- 后台线程根据key对每个分区内的键值对进行排序(sort),是默认操作
- 排序后可以选择进行合并(combine),如果用户没有定义则不会进行,合并会减少溢写的数据量。合并其实就是把相同key的value加起来,类似于reduce,但不能让它改变reduce的结果
- 缓存写入磁盘并清空缓存,在磁盘中生成一个新的文件,这里面的KV对都是经过分区和排序的
- Map任务全部结束后把磁盘文件进行归并(Merge)生成一个大文件。归并是指对于相同key的KV对,归并成一个新的KV对,即value变成了一个list。归并前要判断磁盘的文件数是否超过参数min.num.spills.for.combine的值(默认为3),如果超过了那么就会再次运行combiner
PS:区分开combine和merge,combine是把value直接加起来,而merge只是把value都放在list里
Reduce端:就是领取Map结果,进行归并处理,送给Reduce任务进行处理
具体而言有:
- Reduce任务开始之前,会不断通过RPC向JobTracker询问Map任务是否完成,JobTracker监测到一个Map任务完成后,通知相关的Reduce任务来领取(Fetch)数据,一般Reduce任务会使用多线程同时从多个Map机器领取数据
- Map任务领取的数据先存放在Reduce任务所在机器的缓存中,若缓存满了也会一样溢写。在shuffle阶段,reduce并未真正执行,所以把大多数缓存用来shuffle
- 数据量很少时直接在缓存中归并,当在磁盘中归并得到若干个大文件后不会再进行归并,而是直接输入给reduce任务,shuffle结束,Reduce任务执行Reduce函数中定义的映射,最终保存到HDFS中
书本上还介绍了WordCount实例对应的MapReduce过程、MapReduce具体的编程方法
最好配合Spark里面的shuffle过程和MapReduce的shuffle进行对比
MR和DAG的对比:
http://www.cnblogs.com/intsmaze/p/7197420.html 其实Spark和Hadoop都是基于内存计算,只不过Spark是多个任务的数据通信之间可用到内存,而MR的数据通信都是要在硬盘上(Shuffle还是在硬盘中进行)
https://www.zhihu.com/question/31930662 知乎,Spark为什么比MR快?
HDFS2.0和MapReduce2.0(YARN)
HDFS -> HDFS HA -> HDFS Federation
1.0的不足:
- 抽象层次低,需要人工编码
- 表达能力有限,有些应用无法实现
- 开发者自己管理Job之间的依赖关系,一般实际应用需要多个Job才能实现
- 难以看到程序的整体逻辑,代码的理解和后期维护造成了障碍
- 执行迭代操作效率低,机器学习、数据挖掘任务往往需要多次迭代,MR每次迭代的处理结果都放在HDFS中,这样多次读写HDFS会造成效率低下
- 资源浪费,Reduce任务需要等Map任务执行完才开始工作
- 时效性差,无法进行交互式查询、实时数据处理
对Hadoop的改进和补充:
一方面对HDFS和MapReduce进行改进,另一方面加入更多的组件
HDFS1.0的不足:
名称节点保存元数据:
在磁盘上有FsImage和EditLog
在内存中有映射信息,包括块存储在哪个节点、文件包含哪些块
单点故障问题:唯一的名称节点发生故障,就会让整个集群不可用
第二名称节点(SecondaryNameNode)无法解决这个问题
SNM的作用只是周期性地从NM获取FsImage和EditLog,合并后将新的FsImage和缩小了的EditLog发回给NM,防止NM失败恢复时由于EditLog太大而导致恢复太慢,当NM发生故障时,可从SNM的备份中恢复(作为冷备份)
但是SNM不是热备份,即NM发生故障时,SNM无法取代它支持集群工作,仍然需要等待NM恢复
HDFS HA解决单点故障问题
设置两种名称节点,活跃(Active)和待命(Standby),它们通过共享存储系统达到状态同步
是热备份,一旦活跃名称节点发生故障,马上待命节点就可以替换,提高了可用性(High Availability)
Zookeeper保证了同一时刻只有一个名称节点
数据节点要把心跳信息同时发给所有名称节点,因为这样才能使待命的节点也有相同的元数据信息
扩展性、性能、隔离度差的问题:只有一个名称节点,不能水平扩展,单个节点内存有限;性能受限于单节点的吞吐量;单个节点可能发生一个应用影响另一个应用的执行
HDFS Federation
设计了多个相互独立的名称节点,它们之间是联盟(Federation),所有名称节点共享数据节点资源,这就解决了单个节点的受限问题
属于同一个命名空间的块构成一个块池(block pool)
多个命名空间用客户端挂载表(Client Side Mount Table)进行数据共享和访问
MapReduce2.0(YARN)
1.0的不足:包括JobTracker和TaskTracker
- JobTracker的单点故障
- JobTracker任务太多,包括资源管理分配、任务调度、失败恢复等
- TaskTracker上容易发生内存溢出
- 划分成的Map Slot和Reduce Slot不能互相使用,造成资源浪费
2.0把JobTracker的任务分给ResourceManager(资源管理)和ApplicationMaster(任务调度和监控),NodeManager负责TaskTracker的任务
MapReduce不再是资源管理调度框架,而是分离出来叫做“YARN”,MapReduce2.0是个单纯的计算框架
YARN体系结构:
与HDFS部署在一起的结构图:RM对应NameNode、NM对和AppMaster应DataNode
各部分功能:
ResourceManager包括两个组件,调度器(Scheduler)和应用程序管理器(Application Manager)。前者负责资源分配与调度,接收来自于ApplicationMaster的资源请求,把集群的资源以容器(Container)分配给应用,容器封装了一定的CPU、内存、磁盘等资源,调度器支持用户自定义;后者负责应用的管理工作,包括应用提交、开启与监控Application Master
当作业提交时,ApplicationMaster从RM获取资源(以容器的形式),分配给任务(包括Map和Reduce任务),实现“二次分配”;与NodeManager进行通信,实现启动、运行、监控与停止应用,并对所有任务进行监控,执行失败恢复;定期向RM发出“心跳”,当作业完成时向RM注销容器
NodeManager是YARN集群上每个节点的代理,负责容器生命周期的管理,监控每个容器的资源情况,并以“心跳”的形式发送给RM;接收ApplicationMaster的请求,但它只负责容器的管理,不管理任务,因为ApplicationMaster会通过NM的信息来对任务进行管理
YARN工作流程:
- 向YARN提交程序,包括AppMaster程序、启动AppMaster命令、用户程序
- RM接收请求,Scheduler为应用分配一个容器,AppManager与该容器所在的NM通信,在容器中启动一个AppMaster
- AppMaster向RM注册,这样用户就可以通过RM查看应用的运行状态
- AppMaster采用轮询的方式通过RPC协议向RM申请资源
- RM以容器的形式给AppMaster分配资源,AppMaster一申请到资源后就与该容器对应的NM通信,要求它启动任务
- AppMaster为任务配置环境,容器执行命令脚本启动任务
- 各个任务通过RPC协议向AppMaster汇报
- 应用结束后AppMaster向RM注销应用和关闭自己,若发现失败,则将重新启动直到所有任务执行完
YARN相对于MapReduce1.0的优势:
- ResourceManager的职责比JobTracker少,只负责资源管理,而让每个Job都启动一个AppMaster去监控与调度,而不是由RM独揽大权
- MapReduce1.0既是计算框架也是管理框架,而YARN只是管理框架,其中MapReduce只是运行在YARN之上的一种计算框架,另外Spark、Storm也能运行在YARN之上
- YARN提高了资源利用率,因为以Container为单位取代了Slot,这样避免了Slot闲置导致的资源浪费
YARN的目标就是实现“一个集群多个框架”,即能在YARN之上运行类似于Spark、Storm、Tez、MapReduce等框架