zoukankan      html  css  js  c++  java
  • spark小结

    spark运行流程:

    1.构建Spark Application的运行环境(启动Driver),Driver(SparkContext)向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;

    2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;

    3.Driver(SparkContext)构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task

    4.Task Scheduler将Task发放给Executor运行同时Driver将应用程序代码发放给Executor。

    5.Task在Executor上运行,运行完毕释放所有资源。

    常用术语:

    https://www.cnblogs.com/cxxjohnson/p/8909578.html

    Application

    用户编写Spark应用程序,

    Driver

    表示main()函数,创建SparkContext,有SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等. [自己理解: 是executor的特例]

    Executor

    某个Application运行在worker节点上的一个进程

    Worker

    集群中任何可以运行Application代码的节点

    Task

    executor上的工作单元,是运行application的基本单位,多个task组成一个stage

    Job

    由task组成,action操作触发

    stage

    每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage

    DAGScheduler

    根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法

    TASKScheduler

    TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的. TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。

    spark-submit参数

    spark-submit 
    --name myTest 
    --master yarn 
    --deploy-mode cluster 
    --driver-memory 4g 
    --num-executors 8 
    --executor-cores 16 
    --executor-memory 4g 
    --conf spark.default.parallelism=1500 
    --conf spark.sql.shuffle.partition=1500 
    --conf spark.streaming.kafka.maxRatePerPartition=2200 
    --conf spark.hbase.obtainToken.enabled=true 
    --class com.myspark.userhobby.userAction
    --jars $localroot/lib/hanlp-1.7.2.jar,
    $localroot/lib/jedis-2.9.0.jar,
    $SPARK_HOME/jars/kafka-clients_2.10.jar 
    --files $localroot/config/test01.keystore,
    $localroot/config/test02.txt,
    $localroot/config/hanlp.properties 
    

    name

    应用程序的名称

    在hadoop的yarn页面上可以看到Name

    master

    master地址,提交任务到哪里执行,有spark://host:port, yarn,local

    公司一般用yarn

    deploy-mode

    cluster(集群) 或者client(本地)

    默认client

    yarn-client:

    Client和Driver运行在一起,ApplicationMaster只用来获取资源;结果实时输出在客户端控制台上,可以方便的看到日志信息,推荐使用该模式;

    提交到yarn后,yarn先启动ApplicationMaster和Executor,两者都是运行在Container中。注意:一个container中只运行一个executorbackend;

    yarn-cluser:

    Driver和ApplicationMaster运行在一起,所以运行结果不能在客户端控制台显示,需要将结果需要存放在HDFS或者写到数据库中;

    driver在集群上运行,可通过ui界面访问driver的状态。

    num-executors

    设置Spark作业总共要用多少个Executor进程来执行

    默认为2,在yarn下使用

    executor-memory

    设置每个Executor进程的内存

    默认1G

    num-executors乘以executor-memory,是不能超过队列的最大内存量的

    executor-cores

    设置每个Executor进程的CPU core数量

    driver-memory

    设置Driver进程的内存

    默认1G

    --class

    应用程序的主类,java或scala

    --jars

    逗号分隔的本地jar包,包含在driver和executor的classpath下

    --files

    逗号分隔的文件,这些文件放在每个executor的工作目录下面

    --conf

    修改spark配置属性

    没有则取默认是conf/spark-defaults.conf

    数据倾斜:

    在进行shuffle的时候,必须将各个节点上相同的key拉去到某个节点上的一个task来进行处理,如果某个key对应数据量特别大就会发生数据倾斜

    数据倾斜解决方案:

    1.使用Hive ETL预处理数据

    导致数据倾斜原因是Hive表

    2.导致倾斜的key不重要的话直接过滤掉

    3.提高shuffle操作的并行度

    增加shuffle read task的数量

    4.对于聚合类的shuffle操作,先局部聚合(key加前缀,分开),再全局聚合

    5.将reduce join 转为map join

    6.采样倾斜key并分拆join操作

    7.使用随机前缀和扩容RDD进行join

    spark性能调优

    Spark性能优化指南-基础篇

    https://blog.csdn.net/lukabruce/article/details/81504283

    Spark性能优化指南-高级篇

    https://blog.csdn.net/lukabruce/article/details/81504220

    stage划分判断:

    Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage

    shuffle操作算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等

    1.资源调优

    2.并行度调优

    原则:一个core一般分配2-3个task,每个task一般处理1G数据

    3.代码调优

    避免重复创建RDD,多复用

    重复使用的RDD要持久化

    每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍;所以持久化后可以直接从内存或磁盘中提取持久化的RDD数据
    
    //使用非序列化的方式将RDD中的数据全部尝试持久化到内存中
    rdd1.cache()
    //手动选择持久化级别,并使用指定的方式进行持久化
    rdd2.persist(StorageLevel.MEMORY_AND_DISK_SER)//_SER后缀表示使用序列化方式保存,节省内存
    
    持久化级别
    MEMORY_ONLY 非序列化,全部保存在内存,如果内存不够则放弃;和cache()功能相同
    MEMORY_ONLY_SER 同上; 区别: RDD数据会被序列化
    MEMORY_AND_DISK 非序列化,优先内存,不够的再写入磁盘
    MEMORY_AND_DISK_SER 同上; 区别: RDD数据会被序列化
    DISK_ONLY 非序列化,磁盘(不建议)
    上面五中后面都可加后缀_2 加后缀_2表示持久化的数据都复制一个副本,用于容错(不建议)

    尽量避免使用shuffle类算子

    使用高性能算子

    reduceByKey/aggregateByKey替代groupByKey

    mapPartitions替代普通map

    foreachPartition替代foreach

    特别是写表时,foreach一条一条数据写入,每次创建一个数据库连接;而foreachPartition会一个partition的数据创建一个数据库连接,批量写入,更高效
    

    filter过滤后进行coalesce操作

    如果RDD进行filter过滤较多数据后,用coalesce手动减少RDD的partition数量
    

    使用广播大变量(100M以上的大变量)

  • 相关阅读:
    谈谈Nginx有哪些特点
    网站嵌入百度地图制作
    8张图理解Java
    linux问题-easy_install安装bpython时报错
    linux问题-Centos 安装Sublime text 3
    python例子-Nmap扫描IP并更新
    python例子-PyQuery抓取信息.
    python例子-MySQLdb和练习题
    python例子-线程和队列
    mysql问题-centos7中mysql远程连接问题
  • 原文地址:https://www.cnblogs.com/ShyPeanut/p/13995312.html
Copyright © 2011-2022 走看看