zoukankan      html  css  js  c++  java
  • Spark调优

    RDD
    RDD的全称是 Resilient Distributed Datasets,这是Spark的一种数据抽象集合,它可以被执行在分布式的集群上进行各种操作,而且有较强的容错机制。
    RDD可以被分为若干个分区,每一个分区就是一个数据集片段,从而可以支持分布式计算。

    RDD运行时的角色及相关名词
    有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,在调优的时候会经常遇到。

    Client:指的是客户端进程,主要负责提交job到Master;

    Job:Job来自于我们编写的程序,Application包含一个或者多个job,job包含各种RDD操作;

    Master:指的是Standalone模式中的主控节点,负责接收来自Client的job,并管理着worker,
    可以给worker分配任务和资源(主要是driver和executor资源);

    Worker:指的是Standalone模式中的slave节点,负责管理本节点的资源,同时受Master管理,
    需要定期给Master汇报heartbeat(心跳),启动Driver和Executor;

    Driver:指的是 job(作业)的主进程,一般每个Spark作业都会有一个Driver进程,负责整个作业的运行,
    包括了job的解析、Stage的生成、调度Task到Executor上去执行;

    Stage:中文名 阶段,是job的基本调度单位,因为每个job会分成若干组Task,每组任务就被称为 Stage;

    Task:任务,指的是直接运行在executor上的东西,是executor上的一个线程;

    Executor:指的是执行器,顾名思义就是真正执行任务的地方了,一个集群可以被配置若干个Executor,
    每个Executor接收来自Driver的Task,并执行它(可同时执行多个Task)。

    DAG

    全称是 Directed Acyclic Graph,中文名是有向无环图。Spark就是借用了DAG对RDD之间的关系进行了建模,
    用来描述RDD之间的因果依赖关系。因为在一个Spark作业调度中,多个作业任务之间也是相互依赖的,
    有些任务需要在一些任务执行完成了才可以执行的。在Spark调度中就是有DAGscheduler,
    它负责将job分成若干组Task组成的Stage。

    Spark的部署模式
    主要有local模式、Standalone模式、Mesos模式、YARN模式。
    1.local: 单机模式,用于开发测试。

    2.Standalone:独立模式,Spark 原生的简单集群管理器,自带完整的服务, 可单独部署到一个集群中,无需依赖任何其他资源管理系统,使用 Standalone
    可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管理框架的时候才会使用。

    3.Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 yarn,由于mesos这种方式目前应用的比较少,这里没有记录mesos的部署方式。

    4.YARN: 统一的资源管理机制, 在上面可以运行多套计算框架, 如map reduce、storm 等, 根据 driver 在集群中的位置不同,分为 yarn client 和 yarn cluster。

    重点是 Hadoop YARN 模式下的 Spark 集群部署。
    Spark的运行模式取决于传递给 SparkContext 的 MASTER 环境变量的值, 个别模式还需要辅助的程序接口来配合使用,
    目前支持的 Master 字符串及 URL 包括:

    Master URL Meaning
    local 在本地运行,只有一个工作进程,无并行计算能力
    local[K] 在本地运行,有 K 个工作进程,通常设置 K 为机器的CPU 核心数量
    local[*] 在本地运行,工作进程数量等于机器的 CPU 核心数量。
    spark://HOST:PORT 以 Standalone 模式运行,这是 Spark 自身提供的集群运行模式,默认端口号: 7077
    mesos://HOST:PORT 在 Mesos 集群上运行,Driver 进程和 Worker 进程运行在 Mesos集群上,
    部署模式必须使用固定值:--deploy-mode cluster
    yarn-client 在 Yarn 集群上运行,Driver 进程在本地, Work 进程在 Yarn 集群上,
    部署模式必须使用固定值:--deploy-mode client。Yarn 集群地址必须在HADOOP_CONF_DIR or YARN_CONF_DIR 变量里定义。
    yarn-cluster 效率比yarn-client高,在 Yarn 集群上运行,Driver 进程在 Yarn 集群上,Work 进程也在 Yarn 集群上,
    部署模式必须使用固定值:--deploy-mode cluster。Yarn 集群地址必须在HADOOP_CONF_DIRorYARN_CONF_DIR 变量里定义。
    用户在提交任务给 Spark 处理时,以下两个参数共同决定了 Spark 的运行方式。
    -– master MASTER_URL :决定了 Spark 任务提交给哪种集群处理。
    -– deploy-mode DEPLOY_MODE:决定了 Driver 的运行方式,可选值为 Client或者 Cluster。


    Standalone 模式运行机制
    Standalone 集群有四个重要组成部分, 分别是:
    1)Driver: 是一个进程,我们编写的 Spark 应用程序就运行在 Driver 上, 由Driver 进程执行;
    2)Master:是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;
    3)Worker:是一个进程,一个 Worker 运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储 RDD 的某个或某些 partition;另一个是启动其他进程和线程(Executor) ,对 RDD 上的 partition 进行并行的处理和计算。
    4)Executor:是一个进程, 一个 Worker 上可以运行多个 Executor, Executor 通过启动多个线程( task)来执行对 RDD 的 partition 进行并行计算,也就是执行我们对 RDD 定义的例如 map、flatMap、reduce 等算子操作。

    Standalone Client 模式

    在Standalone Client 模式下,Driver 在任务提交的本地机器上运行,Driver 启动后向 Master 注册应用程序,Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。


    在 Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver进程, Driver 启动后向 Master 注册应用程序, Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,
    然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,
    之后将 task 分发到各个 Executor 上执行。


    Standalone 的两种模式下( client/Cluster) , Master 在接到 Driver 注册Spark 应用程序的请求后,
    会获取其所管理的剩余资源能够启动一个 Executor 的所有 Worker, 然后在这些 Worker 之间分发 Executor,
    此时的分发只考虑 Worker 上的资源是否足够使用,直到当前应用程序所需的所有 Executor 都分配完毕, Executor 反向注册完毕后,Driver 开始执行 main 程序。


    YARN 模式运行机制

    YARN Client 模式

    在 YARN Client 模式下,Driver 在任务提交的本地机器上运行,Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster, 随后 ResourceManager 分配 container , 在 合 适 的 NodeManager 上启动 ApplicationMaster ,
    此时的 ApplicationMaster 的功能相当于一个 ExecutorLaucher, 只负责向 ResourceManager
    申请 Executor 内存。ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,
    然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程, Executor 进程启动后会向 Driver 反向注册, Executor 全部注册完成后 Driver 开始执行 main 函数,之后执行到 Action 算子时,触发一个 job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。


    YARN Cluster 模式

    在 YARN Cluster 模式下, 任务提交后会和 ResourceManager 通讯申请启动ApplicationMaster, 随后 ResourceManager 分配 container,在合适的 NodeManager上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver。
    Driver 启动后向 ResourceManager 申请 Executor 内存, ResourceManager 接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程,Executor 进程启动后会向 Driver 反向注册, Executor 全部注册完成后 Driver 开始执行 main 函数,之后执行到 Action 算子时,触发一个job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个Executor 上执行。


    Shuffle操作

    Shuffle指的是数据从Map端到Reduce端的数据传输过程,Shuffle性能的高低直接会影响程序的性能。因为Reduce task需要跨节点去拉在分布在不同节点上的Map task计算结果,这一个过程是需要有磁盘IO消耗以及数据网络传输的消耗的,所以需要根据实际数据情况进行适当调整。另外,Shuffle可以分为两部分,分别是Map阶段的数据准备与Reduce阶段的数据拷贝处理,在Map端我们叫Shuffle Write,在Reduce端我们叫Shuffle Read。


    惰性执行

    这是RDD的一个特性,在RDD中的算子可以分为Transform算子和Action算子,其中Transform算子的操作都不会真正执行,
    只会记录一下依赖关系,直到遇见了Action算子,在这之前的所有Transform操作才会被触发计算,这就是所谓的惰性执行。
    具体哪些是Transform和Action算子


    常用函数(算子)

    资源参数调优
    如果要进行资源调优,我们就必须先知道Spark运行的机制与流程。
    了解其参数原理便于我们依据实际的数据情况进行配置。
    1)num-executors

    指的是执行器的数量,数量的多少代表了并行的stage数量(假如executor是单核的话),但也并不是越多越快,受你集群资源的限制,所以一般设置50-100左右吧。

    2)executor-memory

    这里指的是每一个执行器的内存大小,内存越大当然对于程序运行是很好的了,但是也不是无节制地大下去,同样受我们集群资源的限制。假设我们集群资源为500core,一般1core配置4G内存,所以集群最大的内存资源只有2000G左右。num-executors x executor-memory 是不能超过2000G的,但是也不要太接近这个值,不然的话集群其他同事就没法正常跑数据了,一般我们设置4G-8G。

    3)executor-cores

    这里设置的是executor的CPU core数量,决定了executor进程并行处理task的能力。

    4)driver-memory

    设置driver的内存,一般设置2G就好了。但如果想要做一些Python的DataFrame操作可以适当地把这个值设大一些。

    5)driver-cores

    与executor-cores类似的功能。

    6)spark.default.parallelism

    设置每个stage的task数量。一般Spark任务我们设置task数量在500-1000左右比较合适,如果不去设置的话,Spark会根据底层HDFS的block数量来自行设置task数量。有的时候会设置得偏少,这样子程序就会跑得很慢,即便你设置了很多的executor,但也没有用。

    下面说一个基本的参数设置的shell脚本,一般我们都是通过一个shell脚本来设置资源参数配置,接着就去调用我们的主函数。

    #!/bin/bash
    basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)

    spark-submit \
    --master yarn \
    --queue samshare \
    --deploy-mode client \
    --num-executors 100 \
    --executor-memory 4G \
    --executor-cores 4 \
    --driver-memory 2G \
    --driver-cores 2 \
    --conf spark.default.parallelism=1000 \
    --conf spark.yarn.executor.memoryOverhead=8G \
    --conf spark.sql.shuffle.partitions=1000 \
    --conf spark.network.timeout=1200 \
    --conf spark.python.worker.memory=64m \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.sql.crossJoin.enabled=True \
    --conf spark.dynamicAllocation.enabled=True \
    --conf spark.shuffle.service.enabled=True \
    --conf spark.scheduler.listenerbus.eventqueue.size=100000 \
    --conf spark.pyspark.driver.python=python3 \
    --conf spark.pyspark.python=python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
    --conf spark.sql.pivotMaxValues=500000 \
    --conf spark.hadoop.hive.exec.dynamic.partition=True \
    --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
    --conf spark.hadoop.hive.exec.max.created.files=100000 \
    ${bashPath}/project_name/main.py $v_var1 $v_var2

    数据倾斜调优
    很多时间数据跑不出来有很大的概率就是出现了数据倾斜,在Spark开发中无法避免的也会遇到这类问题,而这不是一个崭新的问题,
    在Spark中比较容易出现倾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,
    可以优先看这些操作的前后代码。而为什么使用了这些操作就容易导致数据倾斜呢?大多数情况就是进行操作的key分布不均,然后使得大量的数据集中在同一个处理节点上,从而发生了数据倾斜。

    查看Key 分布
    # 针对Spark SQL
    hc.sql("select key, count(0) nums from table_name group by key")

    # 针对RDD
    RDD.countByKey()
    Plan A: 过滤掉导致倾斜的key
    这个方案并不是所有场景都可以使用的,需要结合业务逻辑来分析这个key到底还需要不需要,大多数情况可能就是一些异常值或者空串,这种就直接进行过滤就好了。

    Plan B: 提前处理聚合
    如果有些Spark应用场景需要频繁聚合数据,而数据key又少的,那么我们可以把这些存量数据先用hive算好(每天算一次),然后落到中间表,后续Spark应用直接用聚合好的表+新的数据进行二度聚合,效率会有很高的提升。

    Plan C: 调高shuffle并行度
    # 针对Spark SQL
    --conf spark.sql.shuffle.partitions=1000 # 在配置信息中设置参数
    # 针对RDD
    rdd.reduceByKey(1000) # 默认是200

    Plan D: 分配随机数再聚合
    大概的思路就是对一些大量出现的key,人工打散,从而可以利用多个task来增加任务并行度,以达到效率提升的目的,下面是代码demo,分别从RDD 和 SparkSQL来实现。
    # Way1: PySpark RDD实现
    import pyspark
    from pyspark import SparkContext, SparkConf, HiveContext
    from random import randint
    import pandas as pd

    # SparkSQL的许多功能封装在SparkSession的方法接口中, SparkContext则不行的。
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("sam_SamShare") \
    .config("master", "local[4]") \
    .enableHiveSupport() \
    .getOrCreate()

    conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
    sc = SparkContext(conf=conf)
    hc = HiveContext(sc)

    # 分配随机数再聚合
    rdd1 = sc.parallelize([('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1)])

    # 给key分配随机数后缀
    rdd2 = rdd1.map(lambda x: (x[0] + "_" + str(randint(1,5)), x[1]))
    print(rdd.take(10))
    # [('sam_5', 1), ('sam_5', 1), ('sam_3', 1), ('sam_5', 1), ('sam_5', 1), ('sam_3', 1)]

    # 局部聚合
    rdd3 = rdd2.reduceByKey(lambda x,y : (x+y))
    print(rdd3.take(10))
    # [('sam_5', 4), ('sam_3', 2)]

    # 去除后缀
    rdd4 = rdd3.map(lambda x: (x[0][:-2], x[1]))
    print(rdd4.take(10))
    # [('sam', 4), ('sam', 2)]

    # 全局聚合
    rdd5 = rdd4.reduceByKey(lambda x,y : (x+y))
    print(rdd5.take(10))
    # [('sam', 6)]


    # Way2: PySpark SparkSQL实现
    df = pd.DataFrame(5*[['Sam', 1],['Flora', 1]],
    columns=['name', 'nums'])
    Spark_df = spark.createDataFrame(df)
    print(Spark_df.show(10))

    Spark_df.createOrReplaceTempView("tmp_table") # 注册为视图供SparkSQl使用

    sql = """
    with t1 as (
    select concat(name,"_",int(10*rand())) as new_name, name, nums
    from tmp_table
    ),
    t2 as (
    select new_name, sum(nums) as n
    from t1
    group by new_name
    ),
    t3 as (
    select substr(new_name,0,length(new_name) -2) as name, sum(n) as nums_sum
    from t2
    group by substr(new_name,0,length(new_name) -2)
    )
    select *
    from t3
    """
    tt = hc.sql(sql).toPandas()
    tt

     https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

    https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

    https://mp.weixin.qq.com/s/_VxwYqKPgqvRm7eKNdRwDw

    spark-submit --master yarn --driver-memory 4G --driver-cores 2 --executor-memory 8G --executor-cores 8 --num-executors 10 --py-files dacomponent.zip taskflow_22_5457.py

  • 相关阅读:
    021 顺时针打印矩阵
    020 二叉树的镜像
    019 树的子结构
    018 机器人的运动范围
    017 矩阵中的路径
    022 Jquery总结
    003 css总结
    002 html总结
    016 合并两个排序的链表
    015 反转链表
  • 原文地址:https://www.cnblogs.com/songyuejie/p/15592903.html
Copyright © 2011-2022 走看看