zoukankan      html  css  js  c++  java
  • Spark优化笔记

    优化杂谈

    优化点一:资源
    spark作业在运行的时候能占用多少资源:cpu、memory
    分配”足够多“的资源,在一定范围内,增加资源 和 性能提升 成正比的
    Spark on YARN 作业跑在规划好的YARN的队列中

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi 
        --master yarn 
        --deploy-mode cluster 
        --driver-memory 4g     # Driver的内存
        --executor-memory 2g   # 每个Executor的内存
        --executor-cores 1     # Executor的cpu core的数量
        --queue thequeue       # 运行在YARN的哪个队列上
        --num-executors 3      # Executor的数量 
        examples/jars/spark-examples*.jar 
        10
    
    送你们一句话:尽量将你的作业使用的资源调整到最大
    
    YARN: pkspark  400G 100C
    	50exe ==> 
    		executor-memory = 8G
    		executor-cores  = 2C
    
    num-executors + :	task的并行度  num*cores	
    	4exe 2core = 8task
    	8exe 2core = 16task
    	100task 
    
    executor-cores + : task的并行度
    
    executor-memory + :
    	能cache的数据多 ==> 写入disk的次数会降低
    	shuffle   IO
    	JVM   GC
    
    思考:Spark ETL HBase 运行在YARN之上
    

    调优之算子的选择
    map
    def map[U: ClassTag](f: T => U): RDD[U]

    mapPartitions
    	def mapPartitions[U: ClassTag](
      		f: Iterator[T] => Iterator[U],
      		preservesPartitioning: Boolean = false): RDD[U]
    
    transforamtion:转换算子
    
    RDD = 2Partitions (2 * 1w = 2w)
    	map  2w
    	mapPartitions  2 
    

    QA:转换算子能生成Job吗?

    foreach 
    	def foreach(f: T => Unit)
    
    foreachPartitions
    	def foreachPartition(f: Iterator[T] => Unit)
    
    Action算子
    
    送你们一句话:如果涉及到写数据库操作,
    	建议采用带Partitions的,但是由于mapPartitions是一个transforamtion算子,所以建议采用foreachPartitions
    
    	OOM
    	使用之前:
    		评估你要处理的RDD的数据量
    		每个partition的数据量
    		整个作业使用到的资源
    

    生产或者面试:Spark自定义排序

    class 和 case class在使用层面有什么区别???

    Spark Streaming对接Kafka数据
    对于Kafka来说,我们的Spark Streaming应用程序其实就是一个消费者

    1) Spark Streaming挂了,那么就没有办法去消费Kafka中的数据了,Kafka中的数据就会有积压
    2) 高峰期的时候,由于你作业的资源并没有很好的设置,在某些批次中,很可能数据比较大
    
    batch时间到了,那么Spark Streaming就会处理这个批次中的数据
    假设:batch time 10s  就会出现10s你根本处理不过来整个批次的数据
    后续批次的作业就会产生挤压,那么时效性就没有办法保证
    
    ==> Kafka的限速
    假设限速是100
    
    
    10秒一个批次
    	topic 是1个分区:10 * 1 * 100 = 1000
    	topic 是3个分区:10 * 3 * 100 = 3000
    
    要提升数据处理的吞吐量:提升Kafka的分区数	
    

    Spark Streaming对接Kafka数据进行处理时,能否保证仅处理一次的语义
    至少一次:可能数据消费重复
    至多一次:可能数据有丢失
    仅仅一次:不会有数据的丢失,也不会重复消费 ✅

    能? 怎么做?
    不能做到?还能用吗?
    

    广播
    join: shuffle/reduce join mapjoin

    val o = xxxx // 20M 算子的外部变量
    rdd.map(x => {

    //....
    o
    

    })

    每个task都会获得一份变量o的副本

    20executor 500task ==> 500 * 20M = 10G

    如果使用了广播变量:
    每个executor保存一个变量o的副本

    20 * 20m = 400M
  • 相关阅读:
    std::queue
    关于GridView中如何取得隐藏列的值
    揭秘史上最昂贵的一行Javascript代码
    CentOS 安装 git2.x.x 版本
    error: Some data has already been output, can't send PDF file
    接口和抽象类有什么区别
    php与apache的那些事
    PHP生成PDF完美支持中文,解决TCPDF乱码
    php 表单相关
    快速的去除数组中的指定值
  • 原文地址:https://www.cnblogs.com/lixiangbetter/p/12182821.html
Copyright © 2011-2022 走看看