zoukankan      html  css  js  c++  java
  • Spark基础学习精髓——第一篇

    Spark基础学习精髓

    1 Spark与大数据

    1.1 大数据基础

    1.1.1 大数据特点

    • 存储空间大
    • 数据量大
    • 计算量大

    1.1.2 大数据开发通用步骤及其对应的技术

    大数据采集->大数据预处理->大数据存储->大数据处理->大数据可视化

    (1)大数据采集技术

     分布式架构、多种采集技术混合使用

     web数据采集:shell编程、爬虫工具、爬虫程序开发、HTTP协议、TCP/IP基本原理及Socket程序接口、编程语言、数据格式转换、分布式存储的命令和接口(HDFS、HBase等)、分布式应用开发

     日志数据采集:采集工具(Flume、Fluentd等)、接入工具(Kafka).日志采集程序(Java、Python等)开发、Shell编程、TCP/IP基本原理以及网络编程接口、编程语言、数据格式转换、分布式存储系统的命令和接口(HDFS、HBase等)、分布式应用开发。

     数据库数据采集:Shell编程、采集工具(Flume、Fluentd等)、接入工具(Kafka)、数据库采集程序(Java、Python等)开发、SQL查询语言及编程接口、关系型数据、库连接如JDBC等的使用、TCP/IP基本原理以及Socket编程接口、编程语言、数据格式转换、分布式存储系统的命令和接口(HDFS、HBase等)、分布式应用开发。

    (2)大数据存储技术

     分布式海量文件存储:HDFS CEPH Moosefs GlusterFS。

     NoSQL数据库:Hbase Cassandra。

     NewSQL数据库:VoltDB、Spanner、TiDB等。

    (3)大数据处理技术

     Hadoop框架、Spark框架

    1.2 认识Spark

    Spark是一个统一的大规模数据处理分析引擎。

    技术特点:高性能(基于内存)、支持多种语言、通用(提供SQL操作、流数据处理、图数据处理、机器学习算法库)、多平台运行、分布式开发更容易

    1.3 Spark技术栈

    1.4 Scala与Spark关系

    Spark框架是用scala开发的。Scala语言特点有如下:

    • 面向对象、函数式编程
    • 是强类型语言,不支持类型的隐式转换
    • 静态类型语言
    • 在JVM虚拟机上运行,可以利用JAVA资源
    • 支持交互式解释器REPL

    1.5 Spark快速学习路线

    虚拟机基础:定制虚拟机、能安装centos7

    Linux基础:实现host和guest网络连接、完成基本文件操作、会用vim

    Scala编程:能编写、编译、打包、调试、运行Scala程序,会用Scala编写简单的串行处理程序,能看懂简单的Spark Scala API接口

    Spark基础:能说出Spark程序运行时架构、能提交Spark程序分布式运行、能解释Spark相关概念:RDD、Application、Job、DAG、Stage、Task,能说出Spark程序的运行过程和代码执行过程

    Spark核心编程:能使用IDEA来编写、编译、打包、调试、运行Spark程序;能使用RDD、DataFrame/Dataset的基础API编写Spark程序

    (具体学习资源请参见《Spark大数据编程实用教程》以及《艾叔》网易云系列,能帮助初学者快速入门,少踩坑)

    1.6 使用软件和版本推荐

    VMware workstation15、Centos7.2、jdk-8u162-linux-x64.tar.gz、hadoop-2.7.6.tar.gz、spark-2.3.0-bin-hadoop2.7.tgz、scala-2.11.12、ideaIC-2018.1.4.tar.gz

    2 Spark运行环境的搭建

    构建一个Spark运行环境,除Spark自身框架外,还要有集群管理器和存储系统用来存储输入和输出数据。

    2.1 Spark程序运行时架构

    定义:Spark程序运行后程序的各个组成部分。

    三种角色:

    • Client(客户端):负责提交Spark程序,提交的对象可以是集群管理器、也可以没有提交对象从而在本地运行;
    • Driver(驱动程序):负责此次Spark程序运行的管理和状态监控,从程序开始到程序结束都由Driver全程负责;
    • Executor(执行器):负责执行具体任务,Executor可能有多个,所有executor合并共同完成整个任务。Executor中具体任务是Task,每个Task是一个线程(每个Task不一定只占一个CPU,可以占多个CPU),一个executor中可能有多个Task,一个Task的处理逻辑相同,处理数据不一样;

       Client向集群管理器发出申请,集群管理器接收请求,并为其分配合适的资源。具体选择哪种管理器,可以在Client提交时通过参数指定。每种资源管理器运行Spark程序时机制可能不一样,但不管怎样,Spark程序运行时的架构是不变的。其他细节,如Excutor、Task的分配、资源调度、不同资源管理器上Spak执行机制等。

    2.2 首先构建最简Spark大数据运行环境

    最简单的Spark运行环境由HDFS、Yarn、Spark三个部分组成。

    部署图如下:

     2.2.1 构建HDFS

    1.什么是hdfs

    HDFS(Hadoop Distributed File System Hadoop分布式文件系统),是一个分布式文件系统。Spark处理数据时的数据源和处理结果都存储在HDFS上。

    2.重要特征

     (1)HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M

     (2)HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data

     (3)目录结构及文件分块信息(元数据)的管理由namenode节点承担——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)

     (4)文件的各个block的存储管理由datanode节点承担---- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication)

     (5)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

    3.hdfs命令行

        (1)查看帮助
            hdfs dfs -help 
        (2)查看当前目录信息
            hdfs dfs -ls /
        (3)上传文件
            hdfs dfs -put /本地路径 /hdfs路径
        (4)剪切文件
            hdfs dfs -moveFromLocal a.txt /aa.txt
        (5)下载文件到本地
            hdfs dfs -get /hdfs路径 /本地路径
        (6)合并下载
            hdfs dfs -getmerge /hdfs路径文件夹 /合并后的文件
        (7)创建文件夹
            hdfs dfs -mkdir /hello
        (8)创建多级文件夹
            hdfs dfs -mkdir -p /hello/world
        (9)移动hdfs文件
            hdfs dfs -mv /hdfs路径 /hdfs路径
        (10)复制hdfs文件
            hdfs dfs -cp /hdfs路径 /hdfs路径
        (11)删除hdfs文件
            hdfs dfs -rm /aa.txt
        (12)删除hdfs文件夹
            hdfs dfs -rm -r /hello
        (13)查看hdfs中的文件
            hdfs dfs -cat /文件
            hdfs dfs -tail -f /文件
        (14)查看文件夹中有多少个文件
            hdfs dfs -count /文件夹
        (15)查看hdfs的总空间
            hdfs dfs -df /
            hdfs dfs -df -h /
        (16)修改副本数    
            hdfs dfs -setrep 1 /a.txt
    4.hdfs工作机制

     

    (1)概述

    • HDFS集群分为两大角色:NameNode、DataNode
    • NameNode负责管理整个文件系统的元数据
    • DataNode 负责管理用户的文件数据块
    • 文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上
    • 每一个文件块可以有多个副本,并存放在不同的datanode上
    • Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
    • HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进

    (2)HDFS写工作原理

    有一个文件FileA,100M大小。Client将FileA写入到HDFS上。

    HDFS按默认配置。

    HDFS分布在三个机架上Rack1,Rack2,Rack3。

    a. Client将FileA按64M分块。分成两块,block1和Block2;

    b. Client向nameNode发送写数据请求,如图蓝色虚线①------>。

    c. NameNode节点,记录block信息。并返回可用的DataNode,如粉色虚线②--------->。

        Block1: host2,host1,host3

        Block2: host7,host8,host4

        原理:

            NameNode具有RackAware机架感知功能,这个可以配置。

            若client为DataNode节点,那存储block时,规则为:副本1,同client的节点上;副本2,不同机架节点上;副本3,同第二个副本机架的另一个节点上;其他副本随机挑选。

            若client不为DataNode节点,那存储block时,规则为:副本1,随机选择一个节点上;副本2,不同副本1,机架上;副本3,同副本2相同的另一个节点上;其他副本随机挑选。

    d. client向DataNode发送block1;发送过程是以流式写入。

        流式写入过程,//逐个传输 host2-->host1--host3>

            1>将64M的block1按64k的package划分;

            2>然后将第一个package发送给host2;

            3>host2接收完后,将第一个package发送给host1,同时client想host2发送第二个package;

            4>host1接收完第一个package后,发送给host3,同时接收host2发来的第二个package。

            5>以此类推,如图红线实线所示,直到将block1发送完毕。

            6>host2,host1,host3向NameNode,host2向Client发送通知,说“消息发送完了”。如图粉红颜色实线所示。

            7>client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图×××粗实线

            8>发送完block1后,再向host7,host8,host4发送block2,如图蓝色实线所示。

            9>发送完block2后,host7,host8,host4向NameNode,host7向Client发送通知,如图浅绿色实线所示。

            10>client向NameNode发送消息,说我写完了,如图×××粗实线。。。这样就完毕了。

    分析,通过写过程,我们可以了解到:

        写1T文件,我们需要3T的存储,3T的网络流量贷款。

        在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去。读取时,要读其他节点去。

        挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份。

    (3)HDFS读工作原理

    读操作就简单一些了,如图所示,client要从datanode上,读取FileA。而FileA由block1和block2组成。 

    那么,读操作流程为:

    a. client向namenode发送读请求。

    b. namenode查看Metadata信息,返回fileA的block的位置。

        block1:host2,host1,host3

        block2:host7,host8,host4

    c. block的位置是有先后顺序的,先读block1,再读block2。而且block1去host2上读取;然后block2,去host7上读取;

    上面例子中,client位于机架外,那么如果client位于机架内某个DataNode上,例如,client是host6。那么读取的时候,遵循的规律是:

      优选读取本机架上的数据

    5.HDFS的构建

    (1)定制虚拟机(取名scala_dev)

    在VMware 15.x上安装centos7.x,在第一部分已经介绍过具体的安装包版本,可以在对应的官网或者国内源下载,也可以联系我的邮箱zhangv_chian@163.com。

    具体安装步骤可参见:https://www.cnblogs.com/gebilaoqin/p/12817510.html

    建议安装图形化界面,并且配置好网络,使得host和guest能够相互Ping通。本人使用的是NAT模式,并且给网卡配置固定的IP地址,防止重启后IP有变化,影响后续的配置和ssh登录。

    NAT配置可参见:https://blog.csdn.net/sdyb_yueding/article/details/78216135?utm_source=blogxgwz8

    关闭防火墙可参见:https://www.cnblogs.com/yyxq/p/10551274.html

    安装vmtools设置共享文件夹,将windows下载的安装包通过共享文件夹传递到centos中:https://www.cnblogs.com/Jankin-Wen/p/10157244.html

    修改主机名:

    a. 在root用户下:vi  /etc/hosts

    b. 在root用户下: vi /etc/hostname

    修改完主机名就可以输入:reboot 重启centos

    可以ping scaladev看看是否能解析出IP地址

    (2)scala_dev无密码登录自己

      因为搭建的是最简单的HDFS,NameNode 和 DataNode 都在 scala_dev 上,因此,需要做 scala_dev 无密码登录自己,操作如下。:

      解释:上述命令会 1)自动创建~/.ssh 目录; 2)在~/.ssh 下自动生成:id_dsa 和 id_dsa.pub 两个文件,其中,id_dsa 是私钥,保存在 NameNode 节点,id_dsa.pub 是公钥,要放置在 DataNode 节点,id_dsa.pub 相当于 NameNode 的身份信息,一旦在 DataNode 节点登记,就相当于 DataNode 节点已认可 NameNode,这样, NameNode 就可以无密码登录 DataNode 了; 3)-P 后面的 '' 是 2 个单引号,不是双引号;

      将公钥 id_dsa.pub 加入到 scala_dev 的 authorized_keys 中,实现 scala_dev 对 scala_dev 自身的认证。

     

       修改 authorized_keys 的权限

       验证,如果不需要密码就可以登录,则说明操作成功

    (填自己的IP地址)查询系统自带的java文件,根据不同的系统版本,输入rpm -qa | grep jdk或者rpm -qa | grep java

    (3)配置JDK

    1)查看系统自带的jdk

    2)查询系统自带的java文件,根据不同的系统版本,输入rpm -qa | grep jdk或者rpm -qa | grep java

    3)删除noarch文件以外的其他文件,输入rpm -e --nodeps 需要卸载的安装文件名

    4)查看是否已经删除完毕

     

    5)解压jdk

      在设置好共享目录的前提下,共享目录一般都在/mnt/hgfs/<共享文件名>/, 在windows上把安装包都放在此目录下

    然后解压到/home/user/ ,命令为:tar xf /mnt/hgfs/sharefile/jdk-8u162-linux-x64.tar.gz  /home/user/

    6)配置环境变量

      切换到root, 然后vi /etc/profile,输入以下内容(路径根据自己实际路径来,不要照搬)

     然后切换的普通用户:su user

     配置完环境变量后都要source /etc/profile

     验证:java -version

    (4) 配置HDFS

    1)解压hdfs

    tar xf /mnt/hgfs/sharefile/hadoop-2.7.6.tar.gz   /home/user/

    2)配置环境变量

    切换到root然后编辑/etc/profile(路径根据自己实际路径来,不要照搬)

      配置完环境变量后都要source /etc/profile

     验证,退回到普通用户,输入 hd,看能否用 tab 键补全 hdfs,如果可以,说明 profile 设 置成功,如果不行,则要检查,或者运行 source/etc/profile 再试。 

    3)设置hostname

     此处scaladev不等加下划线_,否则会出错,然后重启;

    4)添加hosts信息

    验证,如果 pingscaladev 能自动解析出 IP,则说明修改成功。 

    5)修改 hadoop-env.sh (在前面修改了环境变量/etc/profile是不够的,必须在每台节点上加入java的路径)

     

     6)修改slaves,它存储的是所有DataNode的主机名

     后续扩展DataNode,只需要在slaves里面加主机名

    7)修改 hdfs-site.xml,先复制模板文件 

     编辑模板

     

     

     

     8)修改core-site.xml 复制模板

     

      配置了 defaultFS 和 fs.default.name 后,会自动在路径前面加上 hdfs://scaladev:9001/前缀,这样,默 认路径就是 hdfs 上的路径,之前的 file:///前缀,表示的是本地文件系统。按照目前的配置,/表示 hdfs://scaladev:9001/,表示 HDFS 上的/目录,而 file:///则表示本地文件系统的/目录。

     9)格式化并启动 HDFS 

    a.格式化

     b.启动HDFS

     c.验证

     

     2.2.2 构建YARN

    1.yarn简介

    Yarn是hadoop的集群管理器,Spark和Mapreduce程序都可以运行在Yarn上。

    2.Yarn配置步骤

    (1)复制 yarn-site.xml 模板文件 

      编辑 yarn-site.xml  

     

     (2)复制 mapred-site.xml 模板文件 

     

     (3)启动Yarn

     (4)验证:Yarn上运行MapReduce程序

      MapReduce 和 Spark 一样,也是一个分布式处理框架,MapReduce 程序和 Spark 一样,可 以提交到 Yarn 上运行,在 Spark 出现之前,MapReduce 是主流的大数据处理平台。Hadoop 中自带了 MapReduce 的例子程序,如经典的 wordcount(单词计数)程序,如果 MapReduce 执行成功,说明 Yarn 配置成功,后续,我们将学习如何将 Spark 程序提交到 Yarn 上执行。

      提交 MapReduce 程序到 Yarn 上执行的步骤如下:

      1)复制文件到HDFS

      file:///表示本地文件系统,/表示 HDFS 的根目录,这是因为在 core-site.xml 中配置了 defaultFS 为 hdfs://scaladev:9001/ 

       2)验证

       3)在 HDFS 上准备输入目录 input,以及文件 core-site.xml 

       4)运行wordcount的例子

       5)验证

     (5)Yarn运行Mapreduce程序的过程

     1)当执行下面的命令后, Client 向 ResourceManager 发送运行 wordcount 程序(Application) 的请求;ResourceManager 响应请求,返回 Application ID 和一个 HDFS 地址;Client 将启动 Application 所需信息(要运行的 jar 包,资源要求等)上传到指定的 HDFS 路径下,并向 ResourceManager 发起启动 MRApplicationMaster(简称 AM)请求; hadoopjarshare/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.6.jarwordcount/input/output 

     2)ResourceManager 根据当前资源使用情况和调度策略,确定一个可用节点(例如 NodeManager01),向该节点的 NodeManager 发送命令,启动一个 Container 来运行 AM;

       AM 是此次 wordcount 程序运行的管理者,从 wordcount 的启动到结束都由此 AM 来负责。 AM 启动后,向 ResourceManager 注册,计算 Task 数(map 数(Split 数量决定,一个 Split 对 应一个 map)+reduce 数(mapreduce.job.reduces)),以此确定 Container 数(map 数+reduce 数),然后准备好每个任务 Container 请求,发送给 ResourceManager;ResourceManager 响应 请求,为其指定可用的节点 NodeManager02~NodeManagerXX;

     3) AM 依次和这些节点的 NodeManager 通信,在这些节点上启动 Container,并在 Container 执行 wordcount 中的 Task,wordcount 的 Task 分为 map 和 reduce 两种,先执行 mapTask,然 后执行 reduceTask(汇总操作),并向 AM 汇报任务状态;在执行过程中,Client 会和 AM 通 信,查询 Application 执行情况或者控制任务执行; 

     4)当某个 Container 上的任务执行完毕,可以退出时,AM 会和 ResourceManager 通信, 申请释放此 Container 及其资源。待总的 Application 结束,所有资源都释放完毕,AM 会向 ResourceManager 申请注销自己,最后,Client 退出。

    (6)Yarn日志

      Yarn的日志分为两大类: 1.Yarn架构自身相关日志,包括ResourceManager和NodeManager 的日志;2. 在 Yarn 上执行的程序(Application)日志。

      第一类日志的 ResourceManager 日志位于 ResoureManager 的$HADOOP_HOME 下的 log 目录中,日志文件名是 yarn-user-resourcemanager-scaladev.log。

      第一类日志的 NodeManager 日志位于每个 NodeManager 的$HADOOP_HOME 下的 log 目 录中,日志文件名是 yarn-user-nodemanager-scaladev.log。

      第二类日志位于 ResoureManager 的$HADOOP_HOME 下的 log/userlogs 目录下,每个 Application 都会根据其 ID 号创建一个目录,例如:application_1533802263437_0005,在此目录下,会保存该 Application 所有 Container 的日志,示例如下,可以看到 wordcount 这个 Application有3个Container,其中尾号为1的container是AM,其它的container用来执行Task, 可能是 MapTask,也可能ReduceTask。每个 container 日志目录下又有 3 个文件:stdout、 stderr 和 syslog,其中 stdout 是 Task 执行过程中输出,例如 printfln 就会输出到 stdout 中, stderr 会保存报错信息,syslog 则会保存系统日志输出。 

    2.2.3 构建Spark集群

    (1)下载 Spark 软件包

    Spark 所选择的版本是 2.3.0,软件包名:spark-2.3.0-bin-hadoop2.7.tgz,下载地址为: http://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz 

    (2)解压

    (3)配置环境变量

    (4)设置JAVA_HOME

    2.3 运行Spark程序

    本地运行、分布式运行

    2.3.1本地运行方式

    示例:SparkPi

    Spark软件包中有一个spark-examples_2.11-2.3.0.jar 它是Spark自带示例的jar包,下面就以其中的SparkPi为例,介绍Spark程序的本地(local)运行方式

    运行SparkPi具体命令如下:

    在spark目录下执行:spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.3.0.jar  10

    SparkPi的程序参数说明如下:

    • --class org.apache.spark.examples.SparkPi , 指明此运行程序的Main Class
    • --master local ,表示此Spark程序Local运行
    • examples/jars/spark-examples_2.11-2.3.0.jar, 为Spark示例的jar包
    • 10,表示迭代10次。

    如果输出结果,这说明成功。

    程序运行时如果报了一个WARN提示:NativeCodeLoader:62-Unable to load native-hadoop library for your platform

    解决办法是在/etc/profile中添加下面的内容

    export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native

    切换到普通用户,运行下面的命令,使得配置生效。

    source  /etc/profile

    2.4 运行Spark程序(分布式)

    Spark程序分布式运行要依赖特定的集群管理器,最常用的有Yarn和Standalone。Client和Driver是否在一个进程里,可以分为client和cluster模式。

    2.4.1 Spark on Yarn

    1.client deploy mode

    以DFSReadWriteTest为例,说明Spark on Yarn的client 的deploy mode。

    DFSReadWriteTest是Spark-examples_2.11-2.3.0.jar自带的一个示例,它会读取本地文件进行单词计数,然后将本地文件上传到HDFS,从HDFS读取该文件,使用Spark进行计数,最后比较两次计数的结果。

    (1)提交Spark程序 到Yarn上 ,以client mode运行

    spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master yarn /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar /etc/profile /output

    运行改程序要保证Yarn和HDFS同时启动。

    如果结果正确,会输出:Success!

    运行报错:初次运行程序时,可能会有以下两个报错。

    报错1的报错信息如下所示:
    Exception in thread "main"java.lang Exception: When unning with master yam' either HADOOP_CONF DIR or YARN_CONF _DIR must be set in the environment.

    报错原因:没有设置环境变量:HADOOP_CONF_DIR或YARN_CONF_DR.
    解决办法:在/etc/profile中增加下面的内容。
    HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

    export HADOOP_CONF_DIR

    切回到普通用户,使刚才的配置失效。

    报错2的报错信息如下所示。

    报错原因:Container(容器)的内存超出了虚拟内存限制,Container的虚拟内存为2.1GB,但使用了2.3GB.comtainer fpd-+22containeriD-container.15373079299 0002_02 0000l1]is runing ba

    mual ncnoy ias CGma uage I64 SMB ofI GB plysical memory uscd 23 GB of 2.1 GB virual meaused Killing container

    解决办法:

    改变分配Container最小物理内存值,将yarn.scheduler.minimum-allocation-mb设置成2GB,重启Yarn,每个Container向RM申请的虚拟内存为2GB*2.1=4.2GB

    (2)Spark程序在Yarn上的执行过程

    1)client模式下,Client和Driver在一个进程内,向Resource Manager发出请求;

    2)Resource Manager指定一个节点启动Container,用来运行AM;AM向resource manager申请container来执行程序,resource manager向AM返回可用节点;

    3)AM同可用节点的NodeManager通信,在每个节点上启动Container,每个Container中运行 一个Spark的Excutor,Excutor再运行若干Tasks;

    4)Driver与Executor通信,向其分配Task并运行,并检测其状态,直到整个任务完成;

    5)总任务完成后,Driver清理Executor,通知AM,AM想ResouceManager请求释放Container,所有资源清理完毕后,AM注销并退出、client退出。

     2. Spark on Yarn (cluster deploy mode)

    (1)提交DFSReadWriteTest到Yarn运行(cluster deploy mode)

    命令:spark-submit --deploy-mode cluster --class org.apache.spark.examples.DFSReadWriteTest --master yarn /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-example_2.11-2.3.0.jar /etc/profile /outputSpark

    (2)Spark程序在Yarn上执行过程

    1)Client想ResourceManager提交Application请求;

    2)ResourceManager指定一个节点,启动Container来运行AM和Spark Driver;AM根据任务情况向ResourceManager申请Container;ResourceManager返回可以运行Container NodeManager;

    3)AM与这些NodeManager通信,启动Container,在Container中执行Executor;

    4)Spark Driver与Executor通信,向它们分配Task,并监控Task执行状态;

    5)所有Task执行完毕后,清理Executor,清理完毕后,Driver通知AM,AM请求Resource Manager,释放所有Container;Client收到Application FINISHED后退出。

     2.4.2 Spark on Standalone

    Standalone是Spark自带的集群管理器,主/从式架构,包括Master和Worker两种角色,Master管理所有的Worker,Worker负责单个节点的管理。

    优点:简单、方便、快速部署。

    缺点:不通用、只支持Spark,功能没有Yarn强大

    1.Spark on Standalone(client deployed mode)

    (1)部署Standalone

    1)配置slaves文件,改文件保存了整个集群中被管理节点的主机名。先复制模板文件;

    cp conf/slaves.template conf/slaves

    2)编辑slaves文件

    vi conf/slaves

    3)将localhost修改为scaladev

    scala_dev

    4)添加JAVA_HOME

    cp conf/spark-env.sh.template conf/spark-env.sh

    5)编辑spark-env.sh文件

    vi conf/spark-env.sh

    6)在最后一行添加下面内容

    export JAVA_HOME=/home/user/jdk1.8.0_162

    7)启动Standalone集群

    sbin/start-all.sh

    8)验证 jps,查看是否有worker和master

    9)查看Standalone的Web监控界面

    (2)提交Spark程序到Standalone上,以client deploy mode运行

    提交前却把HDFS已经启动,HDFS上/output目录下已经清空。具体命令如下:

    spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master spark://scaladev:7077  /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar /etc/profile /output

    其中-master spark:// scaladev:7077 表示连接Standalone集群, scaladev是Master所在的主机名,没有指定--deploy-mode cluster,则部署模式默认为client

    (3)Spark程序在Standalone上的运行过程(client deploy mode)

    client部署模式下,Spark程序在Standalone的运行过程如图所示。

     1)Client初始化,内部启动Client模块和Driver模块,并向Master发送Application请求;

     2)Master接收请求,为其分配Worker,并通知Worker启动Executor;

     3)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Executor的当前情况,继续发送Task,直到整个Job完成。

     2.Spark on Standalone(cluster deploy mode)

    (1)提交Spark程序到Standalone,以cluster deploy mode运行

    具体命令如下:

    spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master spark://scaladev:6066 --deploy-mode cluster /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar /etc/profile hdfs://scaladev:9001/output

    有4点要特别注意:

    1)采用cluster deploy mode时,Driver需要一个处理器,后续Executor还需要另外的处理器,如果虚拟机scaladev只有1个处理器,就会出现资源不足的警告,导致程序运行失败,如下所示:

    WARN TaskSchedulerImpl:66 - Initial job has not accepted any resource

    解决办法:增加虚拟机的处理器为2个。

    2)命令参数中,--master spark://scaladev:6066 用来指定Master的URL,cluster deploy mode下,Client会向Master提交Rest URL, Spark://scaladev:6066就是Spark的Rest URL;如果还是使用原来的参数--master spark://scaladev:7077,则会报下的错误;

    WARN RestSubmissionClient:66 -Unable to connect to server spark://7077

    3)HDFS的路径前面要加hdfs://,因为Cluster Mode下,core-site.xml中的defaultFS设置不起作用;

    4)Client提交成功后就会退出,而不是等待Application结束后才退出。

    (2)Spark程序在Standalone上的运行过程(cluster deploy mode)

    cluster deploy mode下,Spark程序在Standalone的运行过程如图所示。

     1)Client初始化,内部启动Client模块,并向Master注册Driver模块,并等待Driver信息,待后续Driver模块正常运行,Client退出;

     2)Master接收请求,分配一个Worker,并通知这些Worker启动Executor;

     3)Master接受请求,分配Worker,并通知这些Worker启动Executor;

     4)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Executor的当前情况,继续发送Task,直到整个Job完成

    3. Spark on Standalone日志

      standalone的日志分为两类:框架日志、应用日志。

      框架日志:指Master和Worker日志,Master日志位于Master的Spark目录下的logs目录下,文件名:spark-user-org.apache.spark.deploy.master.Master-1-scaladev.out;Worker位于每个Worker节点的Spark目录下的logs目录下,文件名为:spark-user-org.apache.spark.deploy.worker.Worker-1-scaladev.out。

      应用日志:指每个Spark程序运行的日志,因为一个Spark程序可能会启动多个Executor,每个Executor都会有一个日志文件,位于Executor所在的Worker节点的Spark目录的work目录下,每个Spark运行会分配一个ID,运行时在控制台会打印ID的值,如下所示:

      Connected to  Spark cluster with app ID app-2018081004758-0001

      列出woker目录下的内容,命令如下。

      ls work/

      然后就可以看目录下的内容。

  • 相关阅读:
    DMA+USART重定义打印接口
    FTP初探
    ESP8266-lua开发
    GPIO常见问题分析
    新装系统简介
    java四个元注解的作用
    Linux常用操作指令
    @Autowired 与@Resource的区别(详细)
    内存溢出的几种原因和解决办法
    什么是NIO2
  • 原文地址:https://www.cnblogs.com/v2019/p/13493863.html
Copyright © 2011-2022 走看看