zoukankan      html  css  js  c++  java
  • spark

    ########################################

    spark性能调优

    一.开发调优

    1.避免创建重复的RDD

    2.尽量复用RDD

    3.多次使用的RDD要持久化:

    因为RDD执行一个算子操作时,都会重新从源头处计算一遍;持久化的结果会保存到内存或磁盘中;

    持久化级别
    MEMORY_ONLY 非序列化,全部保存在内存,如果内存不够则放弃;和cache()功能相同
    MEMORY_ONLY_SER 同上; 区别: RDD数据会被序列化
    MEMORY_AND_DISK 非序列化,优先内存,不够的再写入磁盘
    MEMORY_AND_DISK_SER 同上; 区别: RDD数据会被序列化
    DISK_ONLY 非序列化,磁盘(不建议)
    上面五中后面都可加后缀_2 加后缀_2表示持久化的数据都复制一个副本,用于容错(不建议)

    4.少使用shuffle类算子

    5.使用map-side预聚合的shuffle操作

    reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合;而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

    6.使用高性能算子

    reduceByKey/aggregateByKey替代groupByKey

    mapPartitions替代普通map

    foreachPartition替代foreach

    特别是写表时,foreach一条一条数据写入,每次创建一个数据库连接;而foreachPartition会一个partition的数据创建一个数据库连接,批量写入,更高效

    7.广播大变量

    比如100M以上的大变量

    import org.apache.spark.broadcast.Broadcast
    
    val rdd=sc.parallelize(List("a","b","c","d"))
    val blackListSet=Set("a","d")
    val blackBroadcast=sc.broadcast(blackListSet)//广播一下
    rdd.filter(x=> !blackBroadcast.value.contains(x))// .value来表示
    

    8.使用Kryo优化序列化性能

    9.优化数据结构

    二.资源调优

    Driver向集群管理器申请运行Spark作业需要使用的资源,资源就是指Executor进程

    num-executors

    设置spark作业要用多少个Executor来执行.driver按这个值在向yarn集群管理器申请资源

    少了任务跑的太慢,多了给不了或者影响其他任务

    executor-memory

    设置每个Executor进程的内存

    num-executors*executor-memory,最好不要超过资源队列最大总内存1/3.

    executor-cores

    设置每个Executor进程的cpu core数

    num-executors * executor-cores,最好不要超过队列总CPU core的1/3

    driver-memory

    设置Driver进程的内存

    collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题

    spark.default.parallelism

    设置每个stage的默认task数量

    Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源

    spark.storage.memoryFraction

    设置RDD持久化数据在Executor内存中能占的比例, 默认值在/conf/spark-defaults.conf中为0.6

    实际项目中如果RDD持久化操作多的话可以适当提高;持久化少可以适当降低;

    如果发现频繁的gc导致运行任务缓慢,意味着task执行用户代码内存不够,也可以降低该数值

    spark.shuffle.memoryFraction

    设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作是能够使用的Executor内存比例

    默认值在/conf/spark-defaults.conf中为0.2

    实际项目中如果RDD持久化操作多的话可以适当提高;持久化少可以适当降低;

    如果发现频繁的gc导致运行任务缓慢,意味着task执行用户代码内存不够,也可以降低该数值

    三.数据倾斜调优

    数据倾斜发生原理:

    在进行shuffle的时候,必须将各个节点上相同的key拉去到某个节点上的一个task来进行处理,如果某个key对应数据量特别大就会发生数据倾斜

    stage划分判断:

    Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage

    shuffle操作算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等

    定位发生倾斜的代码位置:

    在spark web ui或者本地log中查看哪个stage耗时多,再定位到可能的shuffle类算子

    1605767909905

    查看数据倾斜key分布:

    sql语句查表来看key分布

    RDD.countByKey()查看RDD中key的分布

    数据倾斜解决方案:

    1.使用Hive ETL预处理数据

    导致数据倾斜原因是Hive表

    2.导致倾斜的key不重要的话直接过滤掉

    3.提高shuffle操作的并行度

    增加shuffle read task的数量

    4.对于聚合类的shuffle操作,先局部聚合(key加前缀,分开),再全局聚合

    5.将reduce join 转为map join

    6.采样倾斜key并分拆join操作

    7.使用随机前缀和扩容RDD进行join

    8.多种方案组合

    四.shuffle调优

    影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已

    ################################

    Spark性能优化指南-基础篇

    https://blog.csdn.net/lukabruce/article/details/81504283

    Spark性能优化指南-高级篇

    https://blog.csdn.net/lukabruce/article/details/81504220

    spark-submit参数

    spark-submit 
    --name myTest 
    --master yarn 
    --deploy-mode cluster 
    --driver-memory 4g 
    --num-executors 8 
    --executor-cores 16 
    --executor-memory 4g 
    --conf spark.default.parallelism=1500 
    --conf spark.sql.shuffle.partition=1500 
    --conf spark.streaming.kafka.maxRatePerPartition=2200 
    --conf spark.hbase.obtainToken.enabled=true 
    --class com.myspark.userhobby.userAction
    --jars $localroot/lib/hanlp-1.7.2.jar,
    $localroot/lib/jedis-2.9.0.jar,
    $SPARK_HOME/jars/kafka-clients_2.10.jar 
    --files $localroot/config/test01.keystore,
    $localroot/config/test02.txt,
    $localroot/config/hanlp.properties 
    

    name

    应用程序的名称

    在hadoop的yarn页面上可以看到Name

    master

    master地址,提交任务到哪里执行,有spark://host:port, yarn,local

    公司一般用yarn

    deploy-mode

    cluster(集群) 或者client(本地)

    默认client

    yarn-client:

    Client和Driver运行在一起,ApplicationMaster只用来获取资源;结果实时输出在客户端控制台上,可以方便的看到日志信息,推荐使用该模式;

    提交到yarn后,yarn先启动ApplicationMaster和Executor,两者都是运行在Container中。注意:一个container中只运行一个executorbackend;

    yarn-cluser:

    Driver和ApplicationMaster运行在一起,所以运行结果不能在客户端控制台显示,需要将结果需要存放在HDFS或者写到数据库中;

    driver在集群上运行,可通过ui界面访问driver的状态。

    num-executors

    设置Spark作业总共要用多少个Executor进程来执行

    默认为2,在yarn下使用

    executor-memory

    设置每个Executor进程的内存

    默认1G

    num-executors乘以executor-memory,是不能超过队列的最大内存量的

    executor-cores

    设置每个Executor进程的CPU core数量

    driver-memory

    设置Driver进程的内存

    默认1G

    --class

    应用程序的主类,java或scala

    --jars

    逗号分隔的本地jar包,包含在driver和executor的classpath下

    --files

    逗号分隔的文件,这些文件放在每个executor的工作目录下面

    --conf

    修改spark配置属性

    没有则取默认是conf/spark-defaults.conf

    spark运行流程:

    1.构建Spark Application的运行环境(启动Driver),Driver(SparkContext)向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;

    2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;

    3.Driver(SparkContext)构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task

    4.Task Scheduler将Task发放给Executor运行同时Driver将应用程序代码发放给Executor。

    5.Task在Executor上运行,运行完毕释放所有资源。

    常用术语:

    https://www.cnblogs.com/cxxjohnson/p/8909578.html

    Application

    用户编写Spark应用程序,

    Driver

    表示main()函数,创建SparkContext,有SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等. [自己理解: 是executor的特例]

    Executor

    某个Application运行在worker节点上的一个进程

    Worker

    集群中任何可以运行Application代码的节点

    Task

    executor上的工作单元,是运行application的基本单位,多个task组成一个stage

    Job

    由task组成,action操作触发

    stage

    每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage

    DAGScheduler

    根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法

    TASKScheduler

    TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的. TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。

    spark streaming

    #######################################

    ETL(Extract-Transform-Load,抽取-交互转换-加载)

    Hive本质上是基于SQL的查询引擎

    ##################################

    https://stackoverflow.com/questions/4066424/java-lang-outofmemoryerror-java-heap-space-in-maven

    ###################

    1.熟悉Hadoop生态技术,包括Spark,Hive,Hbase,zookeeper等组件的原理,使用,和调优,阅读过相关源码优先

    2.熟悉redis分布式缓存,kafka消息组件.

    3.大规模分布式系统开发,维护经验,有故障处理能力

    ###########

    spark,flink相关大数据处理技术,分析和挖掘用户画像和行为特征,采用协同过滤,ALS,UCB,DeepFM等先关推荐算法 进行个性化推荐;

    基于文本,语意相关性特征(vpcg,rocchio),结合场景化规则和LR,XGBoost,GBDT,DNN等相关算法预测用户点击率,提升搜索准确性;

    基于NLP关键字,摘要提取相关算法和技术(TextRank,word2vec),优化SEO,优化文章关键字和主题提取的准确性;

    基于NLP的情感分析和文本分类相关技术(TexxtCNN,CNN),提升舆情监控系统中用户负面声音和声音分类预测的准确性,及时采集用户负面舆情并进行可视化;未来还会基于NLP意图识别和知识...

    熟悉常用的机器学习分类,聚类的算法原理和特点;

    熟悉深度学习相关模型CNN,RNN,LSTM等相关原理,使用场景,能够针对场景进行建模和优化;

    具有特征工程优化,embedding,模型参数和性能调优化,智能推荐,用户画像分析,NLP相关经验;

    面试问题######

    宽窄依赖

    redis 排序底层原理

    跳表

    spark和spark sql 中 orderby的区别

    reducebykey,groupbykey区别

    哪些场景用到redis

    mysql的索引

    ##########

    IDEA设置tabs多行显示:

    idea => Settings => Editor => Editor Tabs => Show tabs in single row
    

    ##########

    解决IDEA cannot resolve symbol flink暗灰色显示

    https://blog.csdn.net/u012369535/article/details/96317565

    ###########

    [ERROR] Failed to execute goal org.scala-tools:maven-scala-plugin:2.15.2:compile (default) on project NewsFeed: wrap: org.apache.commons.exec.Execute
    Exception: Process exited with an error: -10000(Exit value: -10000) -> [Help 1]

    解决方法:https://stackoverflow.com/questions/50252787/scala-maven-plugin-failed-to-execute

    That is a very old version of the plugin (dating to around 2010), and may not be compatible with recent Java & Scala releases. Incidentally, it has been renamed to the scala-maven-plugin and the latest release is 3.3.2:

    <plugin>
      <groupId>net.alchim31.maven</groupId>
      <artifactId>scala-maven-plugin</artifactId>
      <version>3.3.2</version>
      <!-- etc. -->
    </plugin>
    

    Give that a try...

    Also, I strongly recommend using SBT over Maven if you're primarily working with Scala (SBT handles combined Java and Scala builds too). It has a steeper learning curve than Maven, but is well worth the effort...

    关于Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.1:compile

    https://blog.csdn.net/qq_40746964/article/details/89402248?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-5.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-5.channel_param

    https://stackoverflow.com/questions/28004552/problems-while-compiling-spark-with-maven

    #########

    Spark Streaming流式处理

    https://www.cnblogs.com/jifengblog/p/9372266.html

    KafkaUtils.createDirectStream

    基于Direct的方式,spark1.3引入,这个方法会创建一个直接从Kafka代理获取消息的输入流,不使用任何接受器,简化并行读取,高性能,一次且仅一次的事务机制

    KafkaUtils.createDstream

    基于receiver的方式,是使用Kafka的高层次Consumer API来实现的

    对比:

    基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

    基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

    在实际生产环境中大都用Direct方式

    框架结构

    日志 通过kafka 用户实时画像 存入 redis

    日志→用户长期画像 存入redis

    新闻→通过kafka 内容去重(分词,simhash)→文本分类(主题,关键词,标签) 存入redis/模型训练

    redis → 推荐(热点推荐,长短时推荐列表,召回,排序)

    模块功能

    1.内容去重模块: kafka内容的去重,包括基于docid完全匹配的去重和基于标题,摘要等文本内容相似去重,并完成内容强校验

    2.数据模型(批和流):将内容去重后的内容,存入onedata批模型,供推荐使用;将用户行为打点日志存入批模型,供长时画像使用; 将kafka中的 用户行为日志经过流模型处理,再存入kafka,供短时画像使用

    3.内容下架模块: 下架功能

    4.用户长时画像: 用户行为历史,用户自然属性画像,来构建用户长期画像

    5.用户实时画像:基于kafka中的用户行为日志数据通过加工转换为用户实时画像标签,存入redis,供推荐使用

    6.内容分析模块: 对内容元数据进行文本分析,转换成内容标签,以倒排索引的方式存入倒排索引,以正排的方式存入标签正排redis,供推荐使用

    7.推荐引擎:基于内容标签和用户长期以及实时画像进行推荐,完成召回,过滤,排序和人为干预,并做内容补全后,生成推荐列表

    ##############################

    DMQ(Distributed Message Queue,分布式消息队列):

    消息中间件,传递消息
    
    DMQ主要由消息提供商Producer,消息消费者Consumer,注册中心Zookeeper以及Kafka集群组成
    

    hive同时复制表和数据

    hive> create table test_iot_deviceenergytype as select * from cserver.iot_deviceenergytype;
    

    scala

    import scala.collection.mutable   
    
    val fruitMap=mutable.Map.empty[String,String]
    
    val fruitStr="apple:001##orange:002"
    
    fruitStr.split("##").foreach(e =>{
        val param=e.split(":")
        val key=param(0)
        val value=param(1)
        fruitMap += key ->value
    })
    
    println(fruitMap)
    
    val set1=Set.empty[String]
    val map1=Map("a" ->"1","b"->"2")
    
    def stringNulltoEmpty(xx: String): String = if (xx != null) xx else ""
    
        def isNotIntType(str: String): Boolean = {
            val pattern = "^(\d+)$".r
            str match {
                case pattern(_*) => false
                case _ => true
            }
        }
    
    import org.apache.hadoop.fs.{FileSystem, Path} 
    import scala.collection.mutable
    //加载文件
      //文件内容格式:
    //  #channel=1
    //  科技=生物科技,互联网
    //  #channel=2
    //  娱乐=明星,演唱会
    //  #channel=3
    //  时政=美国,联合国,国际
      def loadFileToMap(hdfs:FileSystem,sc:SparkContext,FilePath:String): mutable.Map[String, mutable.Map[String, Array[String]]] ={
        val resultMap=mutable.Map[String, mutable.Map[String, Array[String]]]
        if(hdfs.exists(new Path(FilePath))){
          val contentArr=sc.textFile(FilePath).collect
          contentArr.foreach(line =>{
            //...
          })
        }
        resultMap
      }
    

    报错总结

    https://blog.csdn.net/u012940753/article/details/82699200

  • 相关阅读:
    16061109-第0次个人作业
    面向对象第四次总结
    面向对象5-7次作业总结
    2018 OO第一次总结(作业1-3)
    (最终作业)面向对象先导课课程总结
    HTML学习笔记
    实验八 进程间通信
    信号
    进程基础
    shell脚本编程
  • 原文地址:https://www.cnblogs.com/ShyPeanut/p/14377945.html
Copyright © 2011-2022 走看看