zoukankan      html  css  js  c++  java
  • <Spark><Tuning and Debugging>

    Overview

    • 这一部分我们主要讨论如果配置一个Spark application,如何tune and debug Spark workloads
    • 配置对Spark应用性能调优很重要。我们有必要理解一个Spark应用的性能。

    Configuring Spark with SparkConf

    • 我们知道,在创建SparkContext的时候会需要SparkConf实例。一个例子:
    • val conf = new SparkConf()
            .setAppName("Test")
            .setMaster("local")
      val sc = new SparkContext(conf)
    • SparkConf类很简单,包含一些用户可覆盖的配置选项的键值对
    • 也可以通过spark-submit动态地设置配置。基于这种方法,你可以在程序中new一个“空”的SparkConf,直接传给SparkContext。这种方式下可以直接使用 --conf标记,该标记之后可以使用任何Spark配置值。例子:
    • bin/spark-submit 
      --class com.wtttt.spark.test 
      --master local 
      --name "test" 
      --conf spark.ui.port=36000 
      test.jar
    • 这种spark-submit的方式除了可以使用--conf,还可以从文件加载。缺省情况下,spark-submit会在conf/spark-defaults.conf中读取whitespace-delimited 的键值对。你也可以使用

      --properties-file 来指定conf文件。例子:

      bin/spark-submit 
      --class com.wttttt.spark.test 
      --properties-file my-config.conf 
      test.jar


      ## Contents of my-config.conf ##
      
      spark.master    local[4]
      spark.app.name  "test"
      spark.ui.port   36000
    • 如果多处同时设置的话,程序设置的优先级高于spark-submit指定的。
    • 完整的conf选项参考 spark configuration

    Components of Execution: Jobs, Tasks, and Stages

    • 我们知道,Spark会把逻辑表示翻译成一系列物理执行计划,by merging multiple operations into tasks.
    • 看下面的例子:
      val input = sc.textFile("input.txt")
      
      val tokenized = input.
              map(line => line.split(" ")).
              filter(words => words.size > 0)
      
      val counts = tokenized.
              map(words => (words(0), 1)).
              reduceByKey{ (a, b) => a + b}
      
      
      // example of the source file "input.txt"
      ## input.txt ##
      INFO This is a message with content
      INFO This is some other content
      (empty line)
      INFO Here are more messages
      WARN This is a warning
      (empty line)
      ERROR Something bad happened
      WARN More details on the bad thing
      INFO back to normal messages
    • 当我们在shell输入上述语句之后,并不会执行任何actions,只会隐式地定义一个DAG(有向无环图)。我们可以用toDebugString来看看:
    • scala> counts.toDebugString
      res84: String =
      (2) ShuffledRDD[296] at reduceByKey at <console>:17
      +-(2) MappedRDD[295] at map at <console>:17
          | FilteredRDD[294] at filter at <console>:15
          | MappedRDD[293] at map at <console>:15
          | input.text MappedRDD[292] at textFile at <console>:13 | input.text HadoopRDD[291] at         textFile at <console>:13
    • 我们执行一个action来触发计算: counts.collect() 
    • 这时,Spark的调度器scheduler会创建一个物理执行计划来计算该action所需的RDDs(递归地向前找所有需要的RDDs)。
    • 更复杂的情况是,stages的物理集合不是与RDD graph 1:1对应的。这种情况发生在scheduler执行pipelining或者合并多个RDDs到一个stage的时候。pipelining发生在RDDs可以从parents本地计算的时候(不需要data movement)
    • 对于上面的例子,计算counts的时候,即使counts有很多个parent RDDs,它也只存在two levels of indentation。所以它的物理执行只需要两个stages。该例中的pipelining就是因为有多个filter和map的序列。如下图:
    • 运行这个action,看spark UI: 一共一个job,两个stages。
      • Completed Jobs (1)
      • Completed Stages (2)

    • 除了pipelining,Spark内部的scheduler也会在有RDD已经被persisted到内存或磁盘的时候缩短RDD graph的lineage。
    • 还有一种(可缩短lineage的)情况是,RDD已经通过之前的shuffle被materialized到磁盘。即使没有显式地调用persist()。这种情况是充分利用了shuffle的输出会写到磁盘的特点。
    • 做个实验:
      counts.cache()
      counts.collect()
      

      看下这个job的stages:

      

     可以看到只执行了一个stage,另一个被skip了。

    • 以上,Spark执行的时候包括这么几个阶段:
      1. User code定义一个RDD的DAG;
      2. Actions force DAG到执行计划的translation;这时Spark调度器提交一个job来计算所有所需的RDDs。该job会有一或多个stages,它们是由task组成的并行计算浪潮(parallel waves of computation composed of tasks)。由于pipelining,每个stage可以对应多个RDDs。
      3. Tasks被调度,在集群上执行;stages按顺序执行,individual tasks执行RDD的各个部分。

    Finding Information

    • 具体的进度信息、性能度量可以在Spark web UI以及driver和executor的logfiles中找到。

    Spark Web UI

    • 说明:YARN cluster模式下,application driver是运行在cluster内部的,因此你需要通过YARN resourceManager来访问UI。

    Jobs: Progress and metrics of stages, tasks, and more

    • 一般首先是点进去一个比较慢的stage,其内部可能存在skew,因此你可以看下是否某些tasks运行时间过长。比如你可以看是否这些tasks read, write or compute much more than others?
    • 你还可以看每个task读、计算、写分别占用的时间。如果tasks花很少时间读写,那么可能user code本身是expensive的,作为solution之一你可以参考advanced programming中提到的"Working on a Per-Partition Basis"来减少比如创建对象的开销。 但是如果tasks花费很多时间来从外部系统读取数据,那么可能不存在更多的外部优化方式。

    Storage: Information for RDDs that are persisted

    • storage页面包含了persisted RDDs的信息。
    • 通常,如果很多RDDs都会缓存的话,older ones可能会被移除。

    Executors: A list of executors present in the application

    • 该页列出了应用中活跃的executors,以及每个executor在处理和存储中的一些度量。
    • 这一页的用处是你可以确认你的application拥有你所预期的资源。

    Environment: Debugging Spark's configuration

    • This page enumerates the set of active properties in the environment of your Spark application. 

    Driver and Executor logs

    • YARN模式下,最简单的收集日志的方式是使用YARN日志收集工具 
      • (running yarn logs -applicationId <app ID>) 

    满地都是六便士,她却抬头看见了月亮。
  • 相关阅读:
    直击美国大选,特朗普担心黑客?郭盛华这样回应
    30岁郭盛华近照曝光,容貌大变判若两人,经历了什么?
    美选举日,科技股飙升,原因是什么?
    索尼收购了TikTok?事实真是这样吗?
    腾讯手游《王者荣耀》创下每日1亿用户记录
    东方联盟创始人郭盛华十大励志名言
    抛弃谷歌!苹果研发自己的搜索引擎技术
    vivo 悟空活动中台
    Android 加载图片占用内存分析
    Linux Page Cache调优在Kafka中的应用
  • 原文地址:https://www.cnblogs.com/wttttt/p/6851674.html
Copyright © 2011-2022 走看看