spark运行流程:
1.构建Spark Application的运行环境(启动Driver),Driver(SparkContext)向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3.Driver(SparkContext)构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task
4.Task Scheduler将Task发放给Executor运行同时Driver将应用程序代码发放给Executor。
5.Task在Executor上运行,运行完毕释放所有资源。
常用术语:
https://www.cnblogs.com/cxxjohnson/p/8909578.html
Application
用户编写Spark应用程序,
Driver
表示main()函数,创建SparkContext,有SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等. [自己理解: 是executor的特例]
Executor
某个Application运行在worker节点上的一个进程
Worker
集群中任何可以运行Application代码的节点
Task
executor上的工作单元,是运行application的基本单位,多个task组成一个stage
Job
由task组成,action操作触发
stage
每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage
DAGScheduler
根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法
TASKScheduler
TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的. TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。
spark-submit参数
spark-submit
--name myTest
--master yarn
--deploy-mode cluster
--driver-memory 4g
--num-executors 8
--executor-cores 16
--executor-memory 4g
--conf spark.default.parallelism=1500
--conf spark.sql.shuffle.partition=1500
--conf spark.streaming.kafka.maxRatePerPartition=2200
--conf spark.hbase.obtainToken.enabled=true
--class com.myspark.userhobby.userAction
--jars $localroot/lib/hanlp-1.7.2.jar,
$localroot/lib/jedis-2.9.0.jar,
$SPARK_HOME/jars/kafka-clients_2.10.jar
--files $localroot/config/test01.keystore,
$localroot/config/test02.txt,
$localroot/config/hanlp.properties
name
应用程序的名称
在hadoop的yarn页面上可以看到Name
master
master地址,提交任务到哪里执行,有spark://host:port, yarn,local
公司一般用yarn
deploy-mode
cluster(集群) 或者client(本地)
默认client
yarn-client:
Client和Driver运行在一起,ApplicationMaster只用来获取资源;结果实时输出在客户端控制台上,可以方便的看到日志信息,推荐使用该模式;
提交到yarn后,yarn先启动ApplicationMaster和Executor,两者都是运行在Container中。注意:一个container中只运行一个executorbackend;
yarn-cluser:
Driver和ApplicationMaster运行在一起,所以运行结果不能在客户端控制台显示,需要将结果需要存放在HDFS或者写到数据库中;
driver在集群上运行,可通过ui界面访问driver的状态。
num-executors
设置Spark作业总共要用多少个Executor进程来执行
默认为2,在yarn下使用
executor-memory
设置每个Executor进程的内存
默认1G
num-executors乘以executor-memory,是不能超过队列的最大内存量的
executor-cores
设置每个Executor进程的CPU core数量
driver-memory
设置Driver进程的内存
默认1G
--class
应用程序的主类,java或scala
--jars
逗号分隔的本地jar包,包含在driver和executor的classpath下
--files
逗号分隔的文件,这些文件放在每个executor的工作目录下面
--conf
修改spark配置属性
没有则取默认是conf/spark-defaults.conf
数据倾斜:
在进行shuffle的时候,必须将各个节点上相同的key拉去到某个节点上的一个task来进行处理,如果某个key对应数据量特别大就会发生数据倾斜
数据倾斜解决方案:
1.使用Hive ETL预处理数据
导致数据倾斜原因是Hive表
2.导致倾斜的key不重要的话直接过滤掉
3.提高shuffle操作的并行度
增加shuffle read task的数量
4.对于聚合类的shuffle操作,先局部聚合(key加前缀,分开),再全局聚合
5.将reduce join 转为map join
6.采样倾斜key并分拆join操作
7.使用随机前缀和扩容RDD进行join
spark性能调优
Spark性能优化指南-基础篇
https://blog.csdn.net/lukabruce/article/details/81504283
Spark性能优化指南-高级篇
https://blog.csdn.net/lukabruce/article/details/81504220
stage划分判断:
Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage
shuffle操作算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等
1.资源调优
2.并行度调优
原则:一个core一般分配2-3个task,每个task一般处理1G数据
3.代码调优
避免重复创建RDD,多复用
重复使用的RDD要持久化
每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍;所以持久化后可以直接从内存或磁盘中提取持久化的RDD数据
//使用非序列化的方式将RDD中的数据全部尝试持久化到内存中
rdd1.cache()
//手动选择持久化级别,并使用指定的方式进行持久化
rdd2.persist(StorageLevel.MEMORY_AND_DISK_SER)//_SER后缀表示使用序列化方式保存,节省内存
持久化级别 | |
---|---|
MEMORY_ONLY | 非序列化,全部保存在内存,如果内存不够则放弃;和cache()功能相同 |
MEMORY_ONLY_SER | 同上; 区别: RDD数据会被序列化 |
MEMORY_AND_DISK | 非序列化,优先内存,不够的再写入磁盘 |
MEMORY_AND_DISK_SER | 同上; 区别: RDD数据会被序列化 |
DISK_ONLY | 非序列化,磁盘(不建议) |
上面五中后面都可加后缀_2 | 加后缀_2表示持久化的数据都复制一个副本,用于容错(不建议) |
尽量避免使用shuffle类算子
使用高性能算子
reduceByKey/aggregateByKey替代groupByKey
mapPartitions替代普通map
foreachPartition替代foreach
特别是写表时,foreach一条一条数据写入,每次创建一个数据库连接;而foreachPartition会一个partition的数据创建一个数据库连接,批量写入,更高效
filter过滤后进行coalesce操作
如果RDD进行filter过滤较多数据后,用coalesce手动减少RDD的partition数量
使用广播大变量(100M以上的大变量)