zoukankan      html  css  js  c++  java
  • Spark 环境搭建

    Spark 环境搭建

    学习目标

    1.了解Spark发展史

    2.完成Spark环境搭建

    3.掌握Spark入门案例-WordCount

    4.完成将Spark任务提交到Yarn

    1.  课程说明

    1.1  课程安排

    整个Spark 框架课程分为如下9个部分,如下图所示:

    1章:Spark环境搭建

    2章:SparkCore

    3章:SparkStreaming

    4章:SparkSQL

    5章:StructuredStreaming

    6章:Spark综合案例

    7章:Spark多语言开发

    8章:Spark3.0新特性

    9章:Spark性能调优

    注:

    1.框架阶段课程重点为学习框架的基本原理和基础使用,更多高级的用法,如和其他框架的整合,项目中的实际使用技巧,会在后续的大项目中进行更多的加强学习

    2.SparkMLlib会在第6和第7部分穿插讲解一些常见机器学习算法,抛砖引玉

    1.2  课程特点

    整个Spark课程学习有如下三个主要特点:

    1.快:课程进度快,知识遗忘快

    2.多:理论原理多,演示代码多

    3.难:入门门槛高,熟练精通难

    1.3  课程要求

    学习Spark课程时,给出如下几项建议/要求:

    1.知识点:书读百遍,其义自现

    需要反复复习!可以多刷笔记和课件,重要的知识点可以自己动手画图并整理思维导图。

    2.代码:键盘敲烂,月薪过万

    代码一定要自己动手至少操作一遍,不熟悉的代码/重点的代码要敲多遍。

    3.bug:bug虐我千百遍,我待bug如初恋

    尝试着自己多动手去解决,高级程序员和初级程序员的一个显著差异就是调bug的能力!

    1. 学习能力:学习力才是最大的竞争力

    新技术层出不穷,唯有不断的学习成长才能保持竞争力!

    图:勤学苦练

     

    图:逮住蛤蟆 攥出尿来 ? 前程似锦 继往开来 !

    1.4  框架版本

    课程中使用官方在2020年9月8日发布的Spark3.0系列最新稳定版:Spark3.0.1

    http://spark.apache.org/news/index.html

    http://spark.apache.org/releases/spark-release-3-0-0.html

    Spark3.0建立在Spark 2.x的许多创新之上,带来了新的想法。在开源社区440多个贡献者的帮助下,这个版本解决了超过3400个问题,添加了各种相关的优化。以下是Spark 3.0的功能亮点:

    1.动态分区裁剪(Dynamic Partition Pruning)

    2.自适应查询执行(Adaptive Query Execution)

    3.加速器感知调度(Accelerator-aware Scheduling)

    4.更好的API扩展(BetterAPI-Extensions-DataSourceV2)

    5.更好的ANSI-SQL兼容(ANSI SQL Compatible)

    6.SparkR向量化读写(Vectorization)

    ...

    TPC-DS30TB基准测试中,Spark 3.0大约比Spark 2.4快两倍。

    1.5  编程语言

    Spark 在诞生之初就提供了多种编程语言接口:Scala、Java、Python 和 SQL,在后面的版本中又加入了 R 语言编程接口。

    对于 Spark 来说,虽然其内核是由 Scala 编写而成,但编程语言从来就不是它的重点,从 Spark 提供这么多的编程接口来说,Spark 鼓励不同背景的人去使用它完成自己的数据探索工作。尽管如此,不同编程语言在开发效率、执行效率等方面还是有些不同,对比如下:

     

    ü Scala 作为 Spark源码 的开发语言当然得到了原生支持,也非常成熟,它简洁的语法也能显著提高开发效率;

    ü Java 也是 Spark 原生支持的开发语言,但是1.7之前 Java 语法冗长且不支持函数式编程,导致它的 API 设计得冗余且不合理,1.8 以后开始支持函数式编程,Java语言规范性好,应用范围广,用户基数大,所以Java API 也是一个很好的选择;

    ü Python 与 R 语言都是解释型脚本语言,可以直接解释执行,不用编译。尤其是 Python 更以简洁著称,开发效率自不必说,此外 Python 与 R 语言本身也支持函数式编程,这两种语言在开发 Spark 作业时也是非常自然,但由于其执行原理是计算任务在每个节点安装的 Python 或 R 的环境中执行,结果通过管道输出给JVM,所以效率要比 Scala 与 Java 低;

    ü SQL 是 Spark 原生支持的开发语言,从各个维度上来说都是最优的,所以一般情况下,用 Spark SQL 解决问题是最优选择。

    ★注意 1:

    这里要特别说明的是,Spark 是由 Scala 开发而成,对于Java、Scala 编程接口来说,在执行计算任务时,是由集群中每个节点的 JVM(Scala 也是 JVM 语言)完成。

    但是如果采用 Python、R 语言编程接口,那么执行过程是由集群中每个节点的 Python 与 R 进程计算并通过管道回传给 JVM,所以性能上会有所损失。具体来说:PySpark是借助Py4j实现Python调用Java,来驱动Spark应用程序,本质上主要还是JVM runtime。在大数据场景下,JVM和Python进程间频繁的数据通信导致其性能损耗较多,恶劣时还可能会直接卡死,所以建议对于大规模机器学习或者Streaming应用场景还是慎用PySpark,应该尽量使用原生的Scala/Java编写应用程序,对于中小规模数据量下的简单离线任务,可以使用PySpark快速部署提交。

    注意 2:

    此外,通过GitHub上Spark相关项目的使用语言上来看,也是Scala使用的最多,所以课程中使用Scala语言为主,但会扩展讲解使用其他语言进行Spark开发

    https://github.com/search?q=spark

     

    2.  Spark概述

    2.1  Spark风雨十年

    2009 年Spark诞生于加州大学伯克利分校 AMP实验室Lab,2013年其原创团队建立了Databricks公司,并将Spark捐献给了Apache软件基金会2014年成为 Apache 顶级项目,2016年发布了Spark 2.0,2019年10月发布Spark 3.0 预览版,2020年6月18日Spark 3.0正式版发布

    Spark的发展历史,经历过几大重要阶段,如下图所示:

     

    2009年伯克利的AMP实验室将 Spark开源以来,Spark在大数据处理领域发展相当迅速,并获得了巨大的成功。如今,Spark已经成为最活跃的开源项目之一,是大数据处理、数据科学、机器学习和数据分析工作负载的事实上的统一引擎。

    从世界著名的开发者论坛Stack Overflow的数据可以看出,2015年开始Spark每月的问题提交数量已经超越Hadoop,而2018年Spark Python版本的API PySpark每月的问题提交数量也已超过Hadoop。2019年排名Spark第一,PySpark第二;而十年的累计排名是Spark第一,PySpark第三。按照这个趋势发展下去,Spark和PySpark在未来很长一段时间内应该还会处于垄断地位。

     

    GitHub 的数据中可以看到,在 Apache 的所有开源项目中,Spark 的关注度排名第 4(前三位分别是前端可视化框架ECharts、 RPC 服务框架 Dubbo 和可视化平台 Superset),在所有大数据处理技术中排名第 1。

    https://github.com/search?o=desc&q=apache&s=stars&type=Repositories

     

    此外,Spark 在资本市场也得到了极高的认可,其背后的商业化公司-Databricks数砖公司得到了 62 亿美元的估值。目前,绝大多数公司和组织会基于Spark 生态搭建自己的大数据平台,构建支持业务的数据管道。可以说,提到大数据处理,Spark 已是一个无法回避的话题。Spark之于大数据开发类似与买房之于结婚!

    2.2  Spark官方介绍

    官方网址:

    http://spark.apache.org/

    https://databricks.com/spark/about

    关于 Spark 的定义,Spark官方给出的是:

    Apache Spark™ is a unified analytics engine for large-scale data processing.

    Spark是用于大规模数据处理的统一分析引擎。

     

    2.3  Spark的特点/流行的原因

    Spark 使用Scala语言进行实现,它是一种面向对、函数式编程语言,能够像操作本地集合一样轻松的操作分布式数据集。Spark具有运行速度快、易用性好、通用性强和随处运行等特点。

     

    2.3.1  速度快

    Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。

     

    l 扩展:Spark与Hadoop相比,有如下区别

    Hadoop

    Spark

    类型

    基础平台, 包含计算, 存储, 调度

    分布式计算工具

    场景

    大规模数据集上的批处理

    迭代计算, 交互式计算, 流计算

    价格

    对机器要求低, 便宜

    对内存有要求, 相对较贵

    编程范式

    Map+Reduce, API 较为底层, 算法适应性差

    RDD组成DAG有向无环图, API 较为顶层, 方便使用

    数据存储结构

    MapReduce中间计算结果存在HDFS磁盘上, 延迟大

    RDD中间运算结果存在内存中 , 延迟小

    运行方式

    Task以进程方式维护, 任务启动慢

    Task以线程方式维护, 任务启动快

     

    ★注意:

    尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop,Spark主要用于替代Hadoop中的MapReduce计算模型。存储依然可以使用HDFS,只是中间结果可以存放在内存中;调度可以使用Spark内置的,也可以使用Hadoop中更成熟的调度系统YARN

    实际上,Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储。

    此外,Hadoop可以使用廉价的、异构的机器来做分布式存储与计算,但是,Spark对硬件的要求稍高一些,对内存与CPU有一定的要求。

    2.3.2  易于使用

    Spark 的版本已经更新到 Spark 3.0.1,支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。

     

     

    2.3.3  通用性强

    Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝地使用这些工具库。其中,Spark SQL 提供了结构化的数据处理方式,Spark Streaming 主要针对流式处理任务,MLlib提供了很多有用的机器学习算法库,GraphX提供图形和图形并行化计算。

     

     

     

    2.3.4  多种运行方式

    Spark 支持多种运行方式,包括在 Hadoop 和 Mesos 上,也支持 Standalone的独立运行模式,同时也可以运行在云Kubernetes上(Spark 2.3+)。

     

    对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。

     

     

    2.4  Spark框架模块-了解

        整个Spark 框架模块包含:Spark Coke、 Spark SQL、 Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上 。

     

    Full Stack 理想的指引下,Spark 中的Spark SQL 、SparkStreaming 、MLLib 、GraphX 几大子框架和库之间可以无缝地共享数据和操作,这不仅打造了Spark 在当今大数据计算领域其他计算框架都无可匹敌的优势,而且使得Spark 正在加速成为大数据处理中心首选通用计算平台。

    2.4.1  Spark Core

    Spark Core实现了 Spark 的基本功能,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。

     

    2.4.2  Spark Streaming

    Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API。

     

    2.4.3  Spark SQL

    通过 Spark SQL,我们可以使用 SQL操作数据。

     

    2.4.4  Structured Streaming

    Structured Streaming是建立在SparkSQL引擎之上的可伸缩和高容错的流式处理引擎,可以像操作静态数据的批量计算一样来执行流式计算。当流式数据不断的到达的过程中Spark SQL的引擎会连续不断的执行计算并更新最终结果。简而言之,Structured Streaming提供了快速、可伸缩、可容错、端到端精确的流处理。

     

    2.4.5  Spark MLlib

    提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。

    数据结构:RDD或者DataFrame

     

    2.4.6  Spark GraphX

    Spark中用于图计算的API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。

     

    2.5  Spark运行模式

    Spark应用程序可以运行在本地模式(Local Mode)、集群模式(Cluster Mode)和云服务(Cloud),方便开发测试和生产部署。

     

    1.本地模式:Local Mode(学习测试使用)

    Spark 应用程序中任务Task运行在一个本地JVM Process进程中,通常开发测试使用。

     

    2.集群模式:Cluster Mode

    Spark应用程序运行在集群上,比如Hadoop YARN集群,Spark 自身集群Standalone及Apache Mesos集群,

    网址:http://spark.apache.org/docs/latest/

     

    - Standalone集群模式(学习测试使用):类似Hadoop YARN架构,典型的Mater/Slaves模式,

    - Standalone-HA高可用集群模式(学习测试/生产环境使用):使用Zookeeper搭建高可用,避免Master是有单点故障的。

    - On YARN集群模式(生产环境使用):运行在 yarn 集群之上,由 yarn 负责资源管理,Spark 负责任务调度和计算,好处:计算资源按需伸缩,集群利用率高,共享底层存储,避免数据跨集群迁移。

    - Apache Mesos集群模式(国内使用较少):运行在 Mesos 资源管理器框架之上,由Mesos 负责资源管理,Spark 负责任务调度和计算。

    3.云服务:Kubernetes 模式

    中小公司未来会更多的使用云服务,Spark 2.3开始支持将Spark 开发应用运行到K8s上。

     

    2.6  SparkRPC-了解

    什么是RPC:https://www.jianshu.com/p/2accc2840a1b

    Spark 1.4的版本中,为SparkRPC通信引入了一套规范、适配

    ActorSystem-> RpcEnv

    Actor:->  RpcEndpoint

    ActorRef:->  RpcEndpointRef

    Spark 1.6之前,使用Akka对actor进行了封装

    Spark 1.6,引入了netty。

    Spark 2.0,完全使用netty,并把Akka移除了。

     

    3.  Spark环境搭建-Local

    3.1  准备工作

    √保证有JDK(Spark源码是使用Scala编写的,编译成.class文件,运行在JVM之上)

    √Scala的SDK?--不在Lunix上进行Spark开发则不需要

    √安装包下载

    课程中使用目前Spark最新稳定版本:3.0.x系列

    http://spark.apache.org/downloads.html

    http://archive.apache.org/dist/spark/spark-3.0.1/

    https://github.com/apache/spark/releases

     

    注意1:

    Spark3.0+基于Scala2.12

    http://spark.apache.org/downloads.html

     

    注意2:

    目前企业中使用较多的Spark版本还是Spark2.x,如Spark2.2.0、Spark2.4.5都使用较多,但未来Spark3.X肯定是主流,毕竟官方牛逼都吹出去了

    http://spark.apache.org/releases/spark-release-3-0-0.html

     

    3.2  原理

    在本地使用多线程模拟Spark集群中的各个角色

     

     

    3.3  操作

    1.上传:将安装包上传至node1

    2.解压:spark安装包【spark-3.0.1-bin-hadoop2.7.tgz】解压至【/export/server】目录:

    tar -zxvf spark-3.0.1-bin-hadoop2.7.tgz

    3.改权限:如果有权限问题,可以修改为root,方便学习时操作,实际中使用运维分配的用户和权限即可

    chown -R root /export/server/spark-3.0.1-bin-hadoop2.7

    chgrp -R root /export/server/spark-3.0.1-bin-hadoop2.7

    4.改名或创建软链接:方便后期升级

    ln -s /export/server/spark-3.0.1-bin-hadoop2.7 /export/server/spark

    5.查看目录结构:其中各个目录含义如下:

    bin        可执行脚本

    conf       配置文件

    data       示例程序使用数据

    examples   示例程序

    jars        依赖 jar 包

    python     pythonAPI

    sbin       集群管理命令

    yarn       整合yarn需要的东东

    3.4  测试

    3.4.1  运行spark-shell

    ●开箱即用:直接启动bin目录下的spark-shell:

    /export/server/spark/bin/spark-shell

    ●运行成功以后,有如下提示信息:

     

    sc:SparkContext实例对象:

    spark:SparkSession实例对象

    4040:Web监控页面端口号

    ●spark-shell说明:

    1.直接使用./spark-shell

    表示使用local 模式启动,在本机启动一个SparkSubmit进程

    2.还可指定参数 --master,如:

    spark-shell --master local[N] 表示在本地模拟N个线程来运行当前任务

    spark-shell --master local[*] 表示使用当前机器上所有可用的资源

    3.不携带参数默认就是

    spark-shell --master local[*]

    4.后续还可以使用--master指定集群地址,表示把任务提交到集群上运行,如

    ./spark-shell --master spark://node01:7077,node02:7077

    5.退出spark-shell

    使用 :quit

     

    3.4.2  初体验-读取本地文件并计算

    1.准备数据

    vim /root/words.txt

    hello me you her 

    hello you her

    hello her 

    hello

    2.执行WordCount

    val textFile = sc.textFile("file:///root/words.txt")

    val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)

    counts.collect

    3.4.3  初体验-读取HDFS文件并计算

    1.准备数据

    上传文件到hdfs

    hadoop fs -put /root/words.txt /wordcount/input/words.txt

    目录如果不存在可以创建

    hadoop fs -mkdir -p /wordcount/input

    结束后可以删除测试文件夹

    hadoop fs -rm -r /wordcount

     

    2.执行WordCount

    val textFile = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

    val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)

    counts.collect

    counts.saveAsTextFile("hdfs://node1:8020/wordcount/output")

     

    3.查看结果文件

    hadoop fs -text /wordcount/output/part*

     

    3.5  监控页面

    每个Spark Application应用运行时,都有一个WEB UI监控页面,默认端口号为4040,可以使用浏览器打开页面,

    http://node1:4040/jobs/

     

     

     

     

     

     

     

     

    4.  Spark环境搭建-Standalone

    4.1  原理

    Standalone模式是Spark自带的一种集群模式,不同于前面Local本地模式使用多线程模拟集群的环境,Standalone模式是真实地在多个机器之间搭建Spark集群的环境

    Standalone集群使用了分布式计算中的master-slave模型:

    master是集群中含有Master进程的节点

    slave是集群中含有Executor进程的Worker节点

    http://spark.apache.org/docs/latest/cluster-overview.html

     

     

    Spark Standalone集群,类似Hadoop YARN,管理集群资源和调度资源:

    l  主节点Master:

    管理整个集群资源,接收提交应用,分配资源给每个应用,运行Task任务

    l  从节点Workers:

    管理每个机器的资源,分配对应的资源来运行Task;

    每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数

    l  历史服务器HistoryServer(可选):

    Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。

    4.2  操作

    http://spark.apache.org/docs/latest/spark-standalone.html

    1.集群规划

    node1:master

    ndoe2:worker/slave

    node3:worker/slave

    2.修改slaves

    进入配置目录

    cd /export/server/spark/conf

    修改配置文件名称

    mv slaves.template slaves

    vim slaves

    内容如下:

    node2

    node3

    3.修改spark-env.sh

    进入配置目录

    cd /export/server/spark/conf

    修改配置文件名称

    mv spark-env.sh.template spark-env.sh

    修改配置文件

    vim spark-env.sh

    增加如下内容:

    ## 设置JAVA安装目录

    JAVA_HOME=/export/server/jdk

     

    ## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要,先提前配上

    HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

    YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

     

    ## 指定spark老大Master的IP和提交任务的通信端口

    SPARK_MASTER_HOST=node1

    SPARK_MASTER_PORT=7077

     

    SPARK_MASTER_WEBUI_PORT=8080

     

    SPARK_WORKER_CORES=1

    SPARK_WORKER_MEMORY=1g

    4.分发到其他机器

    将配置好的将 Spark 安装包分发给集群中其它机器,命令如下:

    cd /export/server/

    scp -r spark-3.0.1-bin-hadoop2.7 root@node2:$PWD

    scp -r spark-3.0.1-bin-hadoop2.7 root@node3:$PWD

    创建软连接

    ln -s /export/server/spark-3.0.1-bin-hadoop2.7 /export/server/spark

    4.3  测试

    1.集群启动和停止

    在主节点上启动spark集群

    /export/server/spark/sbin/start-all.sh

    在主节点上停止spark集群

    /export/server/spark/sbin/stop-all.sh

    在主节点上单独启动和停止Master:

    start-master.sh

    stop-master.sh

    在从节点上单独启动和停止Worker(Worker指的是slaves配置文件中的主机名)

    start-slaves.sh

    stop-slaves.sh

    2.WEB UI页面

    http://node1:8080/

     

    可以看出,配置了2Worker进程实例,每个Worker实例为1核1GB内存,总共是2 2GB 内存。目前显示的Worker资源都是空闲的,当向Spark集群提交应用之后,Spark就会分配相应的资源给程序使用,可以在该页面看到资源的使用情况。

    3.启动spark-shell

    /export/server/spark/bin/spark-shell --master spark://node1:7077

     

    4.运行程序

    sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

    .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    .saveAsTextFile("hdfs://node1:8020/wordcount/output2")

    ★注意

    集群模式下程序是在集群上运行的,不要直接读取本地文件,应该读取hdfs上的

    因为程序运行在集群上,具体在哪个节点上我们运行并不知道,其他节点可能并没有那个数据文件

    5.查看结果文件

    hadoop fs -text /wordcount/output2/part*

    4.4  监控页面

    1.任务Web UI

    http://node1:4040/jobs/

     

     

     

    2.Master主节点WEB UI界面:

    http://node1:8080/

     

     

    5.  Spark环境搭建-Standalone HA

    5.1  原理

    Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题。如何解决这个单点故障的问题,Spark提供了两种方案:

    1.基于文件系统的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。

    2.基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。

    ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。

     

     

     

    5.2  操作

    http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

    1.先停止Sprak集群

    /export/server/spark/sbin/stop-all.sh

    2.在node01上配置:

    修改spark-env.sh注释或删除MASTER_HOST内容:

    vim /export/server/spark/conf/spark-env.sh

    # SPARK_MASTER_HOST=node1

    增加如下配置

    SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"

    参数含义说明:

    spark.deploy.recoveryMode:恢复模式

    spark.deploy.zookeeper.url:ZooKeeper的Server地址

    spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息。

    3.spark-env.sh分发

    cd /export/server/spark/conf

    scp -r spark-env.sh root@node2:$PWD

    scp -r spark-env.sh root@node3:$PWD

     

    5.3  测试

    0.启动zk服务

    zkServer.sh status

    zkServer.sh stop

    zkServer.sh start

    1.node1上启动Spark集群执行

    /export/server/spark/sbin/start-all.sh

    2.在node2上再单独只起个master:

    /export/server/spark/sbin/start-master.sh

    3.查看WebUI

    http://node1:8080/

     

    http://node2:8080/

     

    4.kill掉node1上的master,测试主备切换

    在node1上使用jps查看master进程id

    使用kill -9 id号强制结束该进程

    5.稍等片刻后刷新node2的web界面发现node2为Alive

    http://node2:8080/

     

    6.如启动spark-shell,需要指定多个master地址

    /export/server/spark/bin/spark-shell --master spark://node1:7077,node2:7077

     

    7.执行WordCount

    sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

    .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    .saveAsTextFile("hdfs://node1:8020/wordcount/output3")

    8.查看结果

    hadoop fs -text /wordcount/output3/part*

    9.停止集群

    /export/server/spark/sbin/stop-all.sh

    注意:node2上的Master需要单独停

    6.  Spark环境搭建-Spark on YARN

    Yarn是一个成熟稳定且强大的资源管理和任务调度的大数据框架,在企业中市场占有率很高,意味着有很多公司都在用Yarn,将公司的资源交给Yarn做统一的管理!并支持对任务做多种模式的调度,如FIFO/Capacity/Fair等多种调度模式!

    所以很多计算框架,都主动支持将计算任务放在Yarn上运行,如Spark/Flink

    企业中也都是将Spark Application提交运行在YANR上

    http://spark.apache.org/docs/latest/running-on-yarn.html

    6.1  原理

    注意:

    1.SparkOnYarn的本质是把Spark任务的class字节码文件打成jar包,上传到Yarn集群的JVM中去运行!

    2.Spark集群的相关角色(JVM进程)也会在Yarn的JVM中运行

    3.SparkOnYarn需要:

    - 修改一些配置,让支持SparkOnYarn

    - Spark程序打成的jar包--可以先使用示例jar包spark-examples_2.11-3.0.1.jar,也可以后续使用我们自己开发的程序打成的jar包

    - Spark任务提交工具:bin/spark-submit

    - Spark本身依赖的jars:在spark的安装目录的jars中有,提交任务的时候会被上传到Yarn/HDFS,或手动提前上传上

    4.SparkOnYarn不需要Spark集群,只需要一个单机版spark解压包即可(有示例jar, 有spark-submit,有依赖的jars)

    5.SparkOnYarn根据Driver运行在哪里分为2种模式:client模式和cluster模式

    6.2  操作-准备工作

    Spark Application运行到YARN上时,在提交应用时指定master为yarn即可,同时需要告知YARN集群配置信息(比如ResourceManager地址信息),此外需要监控Spark Application,配置历史服务器相关属性。

    6.2.1  配置Yarn历史服务器并关闭资源检查

    修改yarn-site.xml

    在node1上修改yarn-site.xml,指定MRHistoryServer地址信息并关闭yarn内存检查

    cd /export/server/hadoop/etc/hadoop

    vim /export/server/hadoop/etc/hadoop/yarn-site.xml

    添加内容

    <configuration>

        <!-- 配置yarn主节点的位置 -->

        <property>

            <name>yarn.resourcemanager.hostname</name>

            <value>node1</value>

        </property>

        <property>

            <name>yarn.nodemanager.aux-services</name>

            <value>mapreduce_shuffle</value>

        </property>

        <!-- 设置yarn集群的内存分配方案 -->

        <property>

            <name>yarn.nodemanager.resource.memory-mb</name>

            <value>20480</value>

        </property>

        <property>

            <name>yarn.scheduler.minimum-allocation-mb</name>

            <value>2048</value>

        </property>

        <property>

            <name>yarn.nodemanager.vmem-pmem-ratio</name>

            <value>2.1</value>

        </property>

        <!-- 开启日志聚合功能 -->

        <property>

            <name>yarn.log-aggregation-enable</name>

            <value>true</value>

        </property>

        <!-- 设置聚合日志在hdfs上的保存时间 -->

        <property>

            <name>yarn.log-aggregation.retain-seconds</name>

            <value>604800</value>

        </property>

        <!-- 设置yarn历史服务器web地址 -->

        <property>

            <name>yarn.log.server.url</name>

            <value>http://node1:19888/jobhistory/logs</value>

        </property>

        <!-- 关闭yarn内存检查 -->

        <property>

            <name>yarn.nodemanager.pmem-check-enabled</name>

            <value>false</value>

        </property>

        <property>

            <name>yarn.nodemanager.vmem-check-enabled</name>

            <value>false</value>

        </property>

    </configuration>

    由于使用虚拟机运行服务,默认情况下YARN检查机器内存,当内存不足时,提交的应用无法运行,可以设置不检查资源

    2.分发

    cd /export/server/hadoop/etc/hadoop

    scp -r yarn-site.xml root@node2:$PWD

    scp -r yarn-site.xml root@node3:$PWD

    3.重启Yarn

    /export/server/hadoop/sbin/stop-yarn.sh

    /export/server/hadoop/sbin/start-yarn.sh

    6.2.2  配置Spark历史服务

    1.修改spark-defaults.conf

    进入配置目录

    cd /export/server/spark/conf

    修改配置文件名称

    mv spark-defaults.conf.template spark-defaults.conf

    vim spark-defaults.conf

    添加内容:

    spark.eventLog.enabled                  true

    spark.eventLog.dir                      hdfs://node1:8020/sparklog/

    spark.eventLog.compress                 true

    spark.yarn.historyServer.address        node1:18080

    注意:sparklog需要手动创建

    hadoop fs -mkdir -p /sparklog

    2.修改spark-env.sh

    进入配置目录

    cd /export/server/spark/conf

    修改配置文件

    vim /export/server/spark/conf/spark-env.sh

    增加如下内容:

    添加内容

    ## 配置HDFS和Yarn配置文件路径--前面已经配置过了

    HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

    YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

     

    ## 配置spark历史日志存储地址

    SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"

    3.设置日志级别

    进入目录

    cd /export/server/spark/conf

    修改日志属性配置文件名称

    mv log4j.properties.template log4j.properties

    改变日志级别

    vim log4j.properties

    修改内容如下:

     

    4.同步

    cd /export/server/spark/conf

    scp -r spark-env.sh root@node2:$PWD

    scp -r spark-env.sh root@node3:$PWD

    scp -r spark-defaults.conf root@node2:$PWD

    scp -r spark-defaults.conf root@node3:$PWD

    scp -r log4j.properties root@node2:$PWD

    scp -r log4j.properties root@node3:$PWD

     

    6.2.3  配置依赖Spark Jar

    Spark Application应用提交运行在YARN上时,默认情况下,每次提交应用都需要将依赖Spark相关jar包上传到YARN 集群中,为了节省提交时间和存储空间,将Spark相关jar包上传到HDFS目录中,设置属性告知Spark Application应用。

    1.在HDFS上创建存储spark相关jar包的目录

    hadoop fs -mkdir -p /spark/jars/

     

    2.上传$SPARK_HOME/jars所有jar包HDFS

    hadoop fs -put /export/server/spark/jars/* /spark/jars/

    3.在node1上修改spark-defaults.conf

    vim /export/server/spark/conf/spark-defaults.conf

    添加内容

    spark.yarn.jars  hdfs://node1:8020/spark/jars/*

    同步

    cd /export/server/spark/conf

    scp -r spark-defaults.conf root@node2:$PWD

    scp -r spark-defaults.conf root@node3:$PWD

    6.2.4  启动服务

    - 启动HDFS和YARN服务,在node1执行命令

    start-dfs.sh

    start-yarn.sh

    start-all.sh

    - 启动MRHistoryServer服务,在node1执行命令

    mr-jobhistory-daemon.sh start historyserver

    - 启动Spark HistoryServer服务,,在node1执行命令

    /export/server/spark/sbin/start-history-server.sh

    - MRHistoryServer服务WEB UI页面:

    http://node1:19888

    - Spark HistoryServer服务WEB UI页面:

    http://node1:18080/

    6.3  操作-两种模式

    6.3.1  引入

    l 当一个MR应用提交运行到Hadoop YARN上时

    包含两个部分:

    1.应用管理者AppMaster

    2.任务进程(如MapReduce程序MapTask和ReduceTask任务)

     

    l 当一个Spark应用提交运行在集群上时,应用架构有两部分组成:

    1.Driver Program(资源申请和调度Job执行)

    2.Executors(运行Job中Task任务和缓存数据),

    都是JVM 进程:

     

    Driver程序运行的位置可以通过--deploy-mode 来指定,

    值可以是:

    1.client:表示Driver运行在提交应用的Client(默认)

    2.cluster表示Driver运行在集群中(YARN的NodeManager)

    Driver是什么:

    The process running the main() function of the application and creating the SparkContext

    运行应用程序的main()函数并创建SparkContext的进程

    注意

    cluster和client模式最最本质的区别是:Driver程序运行在哪里。

    企业实际生产环境中使用cluster模式

    6.3.2  client 模式

    client模式下:

    Spark 的Driver驱动程序, 运行在提交任务的客户端上, 和集群的通信成本高!

    因为Driver在客户端,所以Driver中的程序结果输出可以在客户端控制台看到

     

     

       运行圆周率PI程序,采用client模式,命令如下:

    SPARK_HOME=/export/server/spark

    ${SPARK_HOME}/bin/spark-submit

    --master yarn  

    --deploy-mode client

    --driver-memory 512m

    --driver-cores 1

    --executor-memory 512m

    --num-executors 2 

    --executor-cores 1 

    --class org.apache.spark.examples.SparkPi

    ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar

    10

    http://node1:8088/cluster

     

     

    6.3.3  cluster 模式

    cluster模式下:

    Spark 的Driver驱动程序, 运行在Yarn集群上, 和集群的通信成本低!

    Driver是交给Yarn管理的,如果失败会由Yarn重启

    因为Driver运行在Yarn上,所以Driver中的程序结果输出在客户端控制台看不到,在Yarn日志中看

     

     

     

    运行圆周率PI程序,采用cluster模式,命令如下:

    SPARK_HOME=/export/server/spark

    ${SPARK_HOME}/bin/spark-submit

    --master yarn 

    --deploy-mode cluster

    --driver-memory 512m

    --executor-memory 512m

    --num-executors 1

    --class org.apache.spark.examples.SparkPi

    ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar

    10

    http://node1:8088/cluster

     

     

     

     

     

    6.3.4  总结

    Client模式和Cluster模式最最本质的区别是:Driver程序运行在哪里。

    l - Client模式:学习测试时使用,开发不用,了解即可

      1.Driver运行在Client上,和集群的通信成本高

      2.Driver输出结果会在客户端显示

    l - Cluster模式:生产环境中使用该模式

      1.Driver程序在YARN集群中,和集群的通信成本低

      2.Driver输出结果不能在客户端显示

      3.该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启ApplicattionMaster(Driver)

    6.4  扩展阅读:两种模式详细流程

    6.4.1  client 模式

    YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下:

     

       具体流程步骤如下:

    1)、Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster;

     

    2)、随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存;

     

    3)、ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程;

    4)、Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数;

    5)、之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。

    6.4.2  cluster 模式

    YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如下:

     

       具体流程步骤如下:

    1)、任务提交后会和ResourceManager通讯申请启动ApplicationMaster;

    2)、随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver;

     

    3)、Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后在合适的NodeManager上启动Executor进程;

     

    4)、Executor进程启动后会向Driver反向注册;

    5)、Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行;

    6.5  扩展阅读:Spark 集群角色

    Spark Application运行在集群上时,主要有四个部分组成,如下示意图:

     

    1)、Driver:是一个JVM Process 进程,编写的Spark应用程序就运行在Driver上,由Driver进程执行;

    2)、Master(ResourceManager):是一个JVM Process 进程,主要负责资源的调度和分配,并进行集群的监控等职责;

    3)、Worker(NodeManager):是一个JVM Process 进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某个或某些partition;另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算。

    4)、Executor:是一个JVM Process 进程,一个Worker(NodeManager)上可以运行多个Executor,Executor通过启动多个线程(task)来执行对RDD的partition进行并行计算,也就是执行我们对RDD定义的例如map、flatMap、reduce等算子操作。

     

     

    6.6  扩展阅读:spark-shellspark-submit

    Spark支持多种集群管理器(Cluster Manager),取决于传递给SparkContext的MASTER环境变量的值:local、spark、yarn,区别如下:

    Master URL

    Meaning

    local

    在本地运行,只有一个工作进程,无并行计算能力。

    local[K]

    在本地运行,有K个工作进程,通常设置K为机器的CPU核心数量。

    local[*]

    在本地运行,工作进程数量等于机器的CPU核心数量。

    spark://HOST:PORT

    Standalone模式运行,这是Spark自身提供的集群运行模式,默认端口号: 7077。详细文档见:Spark standalone cluster

    Client 部署模式:7077

    Cluster 部署模式:6066

    mesos://HOST:PORT

    Mesos集群上运行,Driver进程和Worker进程运行在Mesos集群上,部署模式必须使用固定值:--deploy-mode cluster。详细文档见:MesosClusterDispatcher.

    yarn-client

    --master yarn --deploy-mode client

    Yarn集群上运行,Driver进程在本地,Executor进程在Yarn集群上,部署模式必须使用固定值:--deploy-mode clientYarn集群地址必须在HADOOP_CONF_DIR or YARN_CONF_DIR变量里定义。

    yarn-cluster

    --master yarn --deploy-mode cluster

    Yarn集群上运行,Driver进程在Yarn集群上,Executor进程也在Yarn集群上,部署模式必须使用固定值:--deploy-mode clusterYarn集群地址必须在HADOOP_CONF_DIR or YARN_CONF_DIR变量里定义。

    6.6.1  spark-shell

    ●引入

    之前我们使用提交任务都是使用spark-shell提交,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下可以用scala编写spark程序,适合学习测试时使用!

    ●示例

    spark-shell可以携带参数

    spark-shell --master local[N] 数字N表示在本地模拟N个线程来运行当前任务

    spark-shell --master local[*] *表示使用当前机器上所有可用的资源

    默认不携带参数就是--master local[*]

    spark-shell --master spark://node01:7077,node02:7077 表示运行在集群上

    6.6.2  spark-submit

    ●引入

    spark-shell交互式编程确实很方便我们进行学习测试,但是在实际中我们一般是使用IDEA开发Spark应用程序打成jar包交给Spark集群/YARN去执行,所以我们还得学习一个spark-submit命令用来帮我们提交jar包给spark集群/YARN

    spark-submit命令是我们开发时常用的!

    ●示例 

    提交任务到Local集群

    SPARK_HOME=/export/server/spark

    ${SPARK_HOME}/bin/spark-submit

    --master local[2] 

    --class org.apache.spark.examples.SparkPi

    ${SPARK_HOME}/examples/jars/spark-examples_2.11-3.0.1.jar

    10

    提交任务到Standalone集群

    --master spark://node1:7077

    提交任务到Standalone-HA集群

    --master spark://node1:7077,node2:7077

    或使用SparkOnYarn的Client模式提交到Yarn集群

    SPARK_HOME=/export/server/spark

    ${SPARK_HOME}/bin/spark-submit

    --master yarn  

    --deploy-mode client

    --driver-memory 512m

    --executor-memory 512m

    --num-executors 1

    --total-executor-cores 2

    --class org.apache.spark.examples.SparkPi

    ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar

    10

     

    或使用SparkOnYarn的Cluster模式提交到Yarn集群

     

    SPARK_HOME=/export/server/spark

    ${SPARK_HOME}/bin/spark-submit

    --master yarn 

    --deploy-mode cluster

    --driver-memory 512m

    --executor-memory 512m

    --num-executors 1

    --total-executor-cores 2

    --class org.apache.spark.examples.SparkPi

    ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar

    10

    6.7  扩展阅读:命令参数

    开发中需要根据实际任务的数据量大小、任务优先级、公司服务器的实际资源情况,参考公司之前的提交的任务的脚本参数,灵活设置即可

    http://spark.apache.org/docs/latest/submitting-applications.html

    [root@node1 bin]#  /export/server/spark/bin/spark-submit --help

    Usage: spark-submit [options] <app jar | python file | R file> [app arguments]

    Usage: spark-submit --kill [submission ID] --master [spark://...]

    Usage: spark-submit --status [submission ID] --master [spark://...]

    Usage: spark-submit run-example [options] example-class [example args]

     

    Options:

      --master MASTER_URL         spark://host:port, mesos://host:port, yarn,

                                  k8s://https://host:port, or local (Default: local[*]).

      --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or

                                  on one of the worker machines inside the cluster ("cluster")

                                  (Default: client).

      --class CLASS_NAME          Your application's main class (for Java / Scala apps).

      --name NAME                 A name of your application.

      --jars JARS                 Comma-separated list of jars to include on the driver

                                  and executor classpaths.

      --packages                  Comma-separated list of maven coordinates of jars to include

                                  on the driver and executor classpaths. Will search the local

                                  maven repo, then maven central and any additional remote

                                  repositories given by --repositories. The format for the

                                  coordinates should be groupId:artifactId:version.

      --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while

                                  resolving the dependencies provided in --packages to avoid

                                  dependency conflicts.

      --repositories              Comma-separated list of additional remote repositories to

                                  search for the maven coordinates given with --packages.

      --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place

                                  on the PYTHONPATH for Python apps.

      --files FILES               Comma-separated list of files to be placed in the working

                                  directory of each executor. File paths of these files

                                  in executors can be accessed via SparkFiles.get(fileName).

     

      --conf, -c PROP=VALUE       Arbitrary Spark configuration property.

      --properties-file FILE      Path to a file from which to load extra properties. If not

                                  specified, this will look for conf/spark-defaults.conf.

     

      --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).

      --driver-java-options       Extra Java options to pass to the driver.

      --driver-library-path       Extra library path entries to pass to the driver.

      --driver-class-path         Extra class path entries to pass to the driver. Note that

                                  jars added with --jars are automatically included in the

                                  classpath.

     

      --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

     

      --proxy-user NAME           User to impersonate when submitting the application.

                                  This argument does not work with --principal / --keytab.

     

      --help, -h                  Show this help message and exit.

      --verbose, -v               Print additional debug output.

      --version,                  Print the version of current Spark.

     

     Cluster deploy mode only:

      --driver-cores NUM          Number of cores used by the driver, only in cluster mode

                                  (Default: 1).

     

     Spark standalone or Mesos with cluster deploy mode only:

      --supervise                 If given, restarts the driver on failure.

     

     Spark standalone, Mesos or K8s with cluster deploy mode only:

      --kill SUBMISSION_ID        If given, kills the driver specified.

      --status SUBMISSION_ID      If given, requests the status of the driver specified.

     

     Spark standalone, Mesos and Kubernetes only:

      --total-executor-cores NUM  Total cores for all executors.

     

     Spark standalone, YARN and Kubernetes only:

      --executor-cores NUM        Number of cores used by each executor. (Default: 1 in

                                  YARN and K8S modes, or all available cores on the worker

                                  in standalone mode).

     

     Spark on YARN and Kubernetes only:

      --num-executors NUM         Number of executors to launch (Default: 2).

                                  If dynamic allocation is enabled, the initial number of

                                  executors will be at least NUM.

      --principal PRINCIPAL       Principal to be used to login to KDC.

      --keytab KEYTAB             The full path to the file that contains the keytab for the

                                  principal specified above.

     

     Spark on YARN only:

      --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").

      --archives ARCHIVES         Comma separated list of archives to be extracted into the

                                  working directory of each executor.

    6.7.1  应用提交语法

    使用【spark-submit】提交应用语法如下:

    Usage: spark-submit [options] <app jar | python file> [app arguments]

    如果使用Java或Scala语言编程程序,需要将应用编译后达成Jar包形式,提交运行。

     

    6.7.2  基本参数配置

    提交运行Spark Application时,有些基本参数需要传递值,如下所示:

     

    动态加载Spark Applicaiton运行时的参数,通过--conf进行指定,如下使用方式:

     

     

    6.7.3  Driver Program 参数配置

    每个Spark Application运行时都有一个Driver Program,属于一个JVM Process进程,可以设置内存Memory和CPU Core核数。

     

     

    6.7.4  Executor 参数配置

    每个Spark Application运行时,需要启动Executor运行任务Task,需要指定Executor个数及每个Executor资源信息(内存Memory和CPU Core核数)。

     

     

    6.7.5  官方案例

    http://spark.apache.org/docs/latest/submitting-applications.html

    Spark 官方提供一些针对不同模式运行Spark Application如何设置参数提供案例,具体如下:

    # Run application locally on 8 cores

    ./bin/spark-submit

      --class org.apache.spark.examples.SparkPi

      --master local[8]

      /path/to/examples.jar

      100

    # Run on a Spark standalone cluster in client deploy mode

    ./bin/spark-submit

      --class org.apache.spark.examples.SparkPi

      --master spark://207.184.161.138:7077

      --executor-memory 20G

      --total-executor-cores 100

      /path/to/examples.jar

      1000

    # Run on a Spark standalone cluster in cluster deploy mode with supervise

    ./bin/spark-submit

      --class org.apache.spark.examples.SparkPi

      --master spark://207.184.161.138:7077

      --deploy-mode cluster

      --supervise 

      --executor-memory 20G

      --total-executor-cores 100

      /path/to/examples.jar

      1000

    # Run on a YARN clusterexport HADOOP_CONF_DIR=XXX

    ./bin/spark-submit

      --class org.apache.spark.examples.SparkPi

      --master yarn

      --deploy-mode cluster  # can be client for client mode

      --executor-memory 20G

      --num-executors 50

      /path/to/examples.jar

      1000

    # Run a Python application on a Spark standalone cluster

    ./bin/spark-submit

      --master spark://207.184.161.138:7077

      examples/src/main/python/pi.py

      1000

    # Run on a Mesos cluster in cluster deploy mode with supervise

    ./bin/spark-submit

      --class org.apache.spark.examples.SparkPi

      --master mesos://207.184.161.138:7077

      --deploy-mode cluster

      --supervise 

      --executor-memory 20G

      --total-executor-cores 100

      http://path/to/examples.jar

      1000

    # Run on a Kubernetes cluster in cluster deploy mode

    ./bin/spark-submit

      --class org.apache.spark.examples.SparkPi

      --master k8s://xx.yy.zz.ww:443

      --deploy-mode cluster

      --executor-memory 20G

      --num-executors 50

      http://path/to/examples.jar

      1000

    7.  Spark应用开发-基于IDEA

    实际开发Spark 应用程序使用IDEA集成开发环境,Spark课程所有代码均使用Scala语言开发,利用函数式编程分析处理数据,更加清晰简洁。

    企业中也使用Java语言开发Spark程序,但较少,后续也可以给大家演示

    7.1  准备工作

     

    7.1.1  创建工程

    创建Maven Project工程

     

    7.1.2  pom.xml

    添加依赖至POM文件中,内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>

        <groupId>cn.itcast</groupId>
        <artifactId>spark_study</artifactId>
        <version>1.0-SNAPSHOT</version>

        <repositories>
            <repository>
                <id>aliyun</id>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            </repository>
            <repository>
                <id>apache</id>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
            </repository>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
        <properties>
            <encoding>UTF-8</encoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <scala.version>2.12.11</scala.version>
            <spark.version>3.0.1</spark.version>
            <hadoop.version>2.7.5</hadoop.version>
        </properties>
        <dependencies>
            <!--依赖Scala语言-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>

            <!--SparkCore依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!-- spark-streaming-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!--spark-streaming+Kafka依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!--SparkSQL依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!--SparkSQL+ Hive依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive-thriftserver_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!--StructuredStreaming+Kafka依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!-- SparkMlLib机器学习模块,里面有ALS推荐算法-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.5</version>
            </dependency>

            <dependency>
                <groupId>com.hankcs</groupId>
                <artifactId>hanlp</artifactId>
                <version>portable-1.7.7</version>
            </dependency>

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>

            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.2</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <plugins>
                <!-- 指定编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
                <!-- 指定编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>

    7.1.3  log4j.properties[可选]

    resourceslog4j.properties

    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #

    # Set everything to be logged to the console
    log4j.rootCategory=WARN, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

    # Set the default spark-shell log level to WARN. When running the spark-shell, the
    # log level for this class is used to overwrite the root logger's log level, so that
    # the user can have different defaults for the shell and regular Spark apps.
    log4j.logger.org.apache.spark.repl.Main=WARN

    # Settings to quiet third party logs that are too verbose
    log4j.logger.org.spark_project.jetty=WARN
    log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
    log4j.logger.org.apache.parquet=ERROR
    log4j.logger.parquet=ERROR

    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

    7.2  WordCount本地运行

    http://spark.apache.org/docs/3.0.1/rdd-programming-guide.html

     

    package cn.itcast.hello

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    /**
     * Author itcast
     * Desc 演示使用Spark开发WordCount-本地版
     */
    object WordCount_bak {
      def main(args: Array[String]): Unit = {
        println(this.getClass.getName.stripSuffix("$"))
        //1.准备环境(Env)sc-->SparkContext
        val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getName.stripSuffix("$")).setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")

        //2.加载文件(Source)
        //RDD:弹性分布式数据集(RDD),Spark中的基本抽象。
        //先简单理解为分布式集合!类似于DataSet!
        //fileRDD: RDD[一行行的数据]
        val fileRDD: RDD[String] = sc.textFile("data/input/words.txt")

        //3.处理数据(Transformation)
        //3.1切分
        //wordRDD: RDD[一个个的单词]
        //val wordRDD: RDD[String] = fileRDD.flatMap((line)=>line.split(" "))
        val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_下划线表示每一行
        //3.2每个单词记为1
        // wordAndOneRDD: RDD[(单词, 1)]
        //val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((word)=>(word,1))
        val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每一个单词
        //3.3.分组聚合reduceByKey= groupByKey + sumreduce
        //val groupedRDD: RDD[(String, Iterable[Int])] = wordAndOneRDD.groupByKey()
        //val resultRDD: RDD[(String, Int)] = groupedRDD.mapValues(_.sum)
        //val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey((temp,current)=>temp + current)
        val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)

        //4.输出结果(Sink)
        //4.1直接遍历分布式集合并输出
        resultRDD.foreach(println)//cation

        //4.2将分布式集合收集为本地集合再输出
        val array: Array[(String, Int)] = resultRDD.collect()//action
        //println(array.toBuffer)
        //array.foreach(t=>println(t))
        //array.foreach(println(_))
        //array.foreach(println _)
        array.foreach(println)//行为参数化

        Thread.sleep(1000 * 1200)//等待20分钟,方便查看webUI:http://localhost:4040/jobs/

        //5.关闭资源
        sc.stop()
      }
    }

    7.3  WordCount集群运行

    ★注意

    如果写入HDFS存在权限问题:

    需要修改权限:

    hadoop fs -chmod -R 777  /

    并在代码中添加:

    System.setProperty("HADOOP_USER_NAME", "root")

    1.修改代码如下

    package cn.itcast.hello

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    /**
     * Author itcast
     * Desc 演示使用Spark开发WordCount-集群版
     */
    object WordCount {
      def main(args: Array[String]): Unit = {
        println(this.getClass.getName.stripSuffix("$"))
        if(args.length != 2){
          println("请携带2个参数: input-path and oupput-path")
          System.exit(1)//0表示非正常退出程序
        }

        //1.准备环境(Env)sc-->SparkContext
        val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getName.stripSuffix("$"))//.setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")

        //2.加载文件(Source)
        val fileRDD: RDD[String] = sc.textFile(args(0))//表示启动该程序的时候需要通过参数执行输入数据路径

        //3.处理数据(Transformation)
        //3.1切分
        val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_下划线表示每一行
        //3.2每个单词记为1
        val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每一个单词
        //3.3.分组聚合reduceByKey= groupByKey + sumreduce
        val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)

        //4.输出结果(Sink)
        resultRDD.foreach(println)
        resultRDD.saveAsTextFile(s"${args(1)}_${System.currentTimeMillis()}")

        //Thread.sleep(1000 * 120)//等待2分钟,方便查看webUI:http://localhost:4040/jobs/

        //5.关闭资源
        sc.stop()
      }
    }

    2.打成jar包

     

    3.改名

     

    4.上传jar包

    创建HDFS目录

    hdfs dfs -mkdir -p /spark/apps/

    上传jar包Linux后再上传到HDFS,方便其他机器提交任务时也可以读取

    hdfs dfs -put /root/wc.jar /spark/apps/

     

    提交到Yarn

    SPARK_HOME=/export/server/spark

    ${SPARK_HOME}/bin/spark-submit

    --master yarn 

    --deploy-mode cluster

    --driver-memory 512m

    --driver-cores 1

    --executor-memory 512m

    --num-executors 2 

    --executor-cores 1 

    --class cn.itcast.hello.WordCount 

    hdfs://node1:8020/spark/apps/wc.jar

    hdfs://node1:8020/wordcount/input/words.txt hdfs://node1:8020/wordcount/output

    http://node1:8088/cluster

    7.4  WebUI界面

     

     

     

     

    7.5  流程图解

    l WordCount,主要流程如下图所示:

     

    7.6  扩展阅读:main函数执行流程

         Spark Application应用程序运行时,无论client还是cluster部署模式DeployMode,当Driver Program和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序为例剖析讲解。

     

    l  一、构建SparkContex对象和关闭SparkContext资源,都是在Driver Program中执行,上图中①和③都是,如下图所示:

     

    l  二、上图中②的加载数据【A】、处理数据【B】和输出数据【C】代码,都在Executors上执行,从WEB UI监控页面可以看到此Job(RDD#action触发一个Job)对应DAG图,如下所示:

     

    将结果数据resultRDD调用saveAsTextFile方法,保存数据到外部存储系统中,代码在Executor中执行的。但是如果resultRDD调用take、collect或count方法时,获取到最终结果数据返回给Driver,代码如下:

     

    运行应用程序时,将数组resultArray数据打印到标准输出,Driver Program端日志打印结果:

     

    综上所述Spark Application中Job执行有两个主要点:

    1)、RDD输出函数分类两类

    1:返回值给Driver Progam,比如count、first、take、collect等

    2:没有返回值,比如直接打印结果、保存至外部存储系统(HDFS文件)等

    2)、在Job中从读取数据封装为RDD和一切RDD调用方法都是在Executor中执行,其他代码都是在Driver Program中执行

    SparkContext创建与关闭、其他变量创建等在Driver Program中执行

    RDD调用函数都是在Executors中执行

    7.7  扩展阅读:集群硬件配置要求

     

     

    7.8  扩展阅读:蒙特卡罗算法求PI

     

    import java.util.Random;

    /**
     * Author itcast
     * Desc 蒙特卡罗算法求PI
     */
    public class MonteCarloTest {
        public static void main(String[] args) {
            long count = 0;//记录落在扇形区域的点的数量
            long total = 1000000000;//总共撒的点的数量

            Random random = new Random();
            for (long i = 0; i < total; i++) {
                double x = random.nextDouble();//生成横坐标x, [0,1)范围的随机数
                double y = random.nextDouble();//生成纵坐标y, [0,1)范围的随机数
                if (x * x + y * y < 1) {//点落在扇形里面
                    count++;
                }
            }

            double pi = 4 * (count * 1.0 / total);
            System.out.println("蒙特卡洛算法求得的Pi值为:" + pi);
        }
    }

  • 相关阅读:
    Django+drf学习过程笔记
    网络编程之socket
    python回收机制
    异常处理
    面向对象
    常用模块
    部分内置模块
    模块的使用、软件开发目录规范
    迭代器、生成器、函数递归调用及二分式
    三元表达式、生成式、匿名函数
  • 原文地址:https://www.cnblogs.com/shan13936/p/14233788.html
Copyright © 2011-2022 走看看