zoukankan      html  css  js  c++  java
  • Spark Streaming资源动态分配和动态控制消费速率

    本篇从二个方面讲解:

    高级特性:

    1、Spark Streaming资源动态分配

    2、Spark Streaming动态控制消费速率

    原理剖析,动态控制消费速率其后面存在一套理论,资源动态分配也有一套理论。

    先讲理论,后面讨论。

    为什么要动态资源分配和动态控制速率?

    Spark默认是先分配资源,然后计算;粗粒度的分配方式,资源提前分配好,有计算任务提前分配好资源;

    不好的地方:从Spark Streaming角度讲有高峰值和低峰值,如果资源分配从高峰值、低峰值考虑都有大量资源的浪费。

    其实当年Spark Streaming参考了Storm的设计思想,在其基础上构建的Spark Streaming2.0x内核有

    很大变化,此框架的最大好处就是和兄弟框架联手。我们考虑Spark Streaming资源分配按高峰值分配的话,就会造成预分配资源浪费,尤其

    是低峰值造成大量资源浪费。

    Spark Streaming本身基于Spark Core的,Spark Core的核心是SparkContext对象,从SparkContext类代码的556行开始,支持资源的动态分配,源码如下:

    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
      logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
    }

    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

    通过配置参数:spark.dynamicAllocation.enabled看是否需要开启Executor的动态分配:

    /**
     * Return whether dynamic allocation is enabled in the given conf
     * Dynamic allocation and explicitly setting the number of executors are inherently
     * incompatible. In environments where dynamic allocation is turned on by default,
     * the latter should override the former (SPARK-9092).
     */
    def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
      conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
        conf.getInt("spark.executor.instances", 0) == 0
    }
    根据代码发现,你可以在程序运行时不断设置spark.dynamicAllocation.enabled参数的值,如果支持资源动态分配的话就使用ExecutorAllocationManager类:
    /**
     * An agent that dynamically allocates and removes executors based on the workload.
     * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
     * synced to the cluster manager. The target starts at a configured initial value and changes with
     * the number of pending and running tasks.
     * Decreasing the target number of executors happens when the current target is more than needed to
     * handle the current load. The target number of executors is always truncated to the number of
     * executors that could run all current running and pending tasks at once.
     *
     * Increasing the target number of executors happens in response to backlogged tasks waiting to be
     * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
     * the queue persists for another M seconds, then more executors are added and so on. The number
     * added in each round increases exponentially from the previous round until an upper bound has been
     * reached. The upper bound is based both on a configured property and on the current number of
     * running and pending tasks, as described above.
     *
     * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
     * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
     * we may add more executors than we need just to remove them later. (2) Executors should be added
     * quickly over time in case the maximum number of executors is very high. Otherwise, it will take
     * a long time to ramp up under heavy workloads.
     *
     * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not
     * been scheduled to run any tasks, then it is removed.
     *
     * There is no retry logic in either case because we make the assumption that the cluster manager
     * will eventually fulfill all requests it receives asynchronously.
     *
     * The relevant Spark properties include the following:
     *
     *   spark.dynamicAllocation.enabled - Whether this feature is enabled
     *   spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
     *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
     *   spark.dynamicAllocation.initialExecutors - Number of executors to start with
     *
     *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
     *     If there are backlogged tasks for this duration, add new executors
     *
     *   spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) -
     *     If the backlog is sustained for this duration, add more executors
     *     This is used only after the initial backlog timeout is exceeded
     *
     *   spark.dynamicAllocation.executorIdleTimeout (K) -
     *     If an executor has been idle for this duration, remove it
     */
    private[spark] class ExecutorAllocationManager(
        client: ExecutorAllocationClient,
        listenerBus: LiveListenerBus,
        conf: SparkConf)
      extends Logging {

      allocationManager =>

      import ExecutorAllocationManager._

      // Lower and upper bounds on the number of executors.
      private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
      private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
        Integer.MAX_VALUE)

    动态控制执行的executors个数。扫描executor情况,正在运行的Stage,增加executor或减少executor个数,例如减少executor情况;例如60秒发现一个任务都没有运行就会remove executor;当前应用程序含有所有启动的executors,在driver保持对executors的引用。

    由于时钟,就有不断的循环、就有增加和删除exector的操作。

    之所以动态就是有时钟,每隔固定周期看看。需要删除的话发一个kill消息,需要添加的话就往worker发消息增加一个executor。

    我们看一下Master的scheduler方法:

    /**
     * Schedule the currently available resources among waiting apps. This method will be called
     * every time a new app joins or resource availability changes.
     */
    private def schedule(): Unit = {
      if (state != RecoveryState.ALIVE) { return }
      // Drivers take strict precedence over executors
      val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
      for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
        for (driver <- waitingDrivers) {
          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
            launchDriver(worker, driver)
            waitingDrivers -= driver
          }
        }
      }
      startExecutorsOnWorkers()
    }

    需要实现资源动态调度的话需要一个时钟需要协助,资源默认分配的方式在master的scheduler。

    如果通过配置动态分配资源会调用ExecutorAllocationManager类的scheduler方法:

    /**
     * This is called at a fixed interval to regulate the number of pending executor requests
     * and number of executors running.
     *
     * First, adjust our requested executors based on the add time and our current needs.
     * Then, if the remove time for an existing executor has expired, kill the executor.
     *
     * This is factored out into its own method for testing.
     */
    private def schedule(): Unit = synchronized {
      val now = clock.getTimeMillis

      updateAndSyncNumExecutorsTarget(now)

      removeTimes.retain { case (executorId, expireTime) =>
        val expired = now >= expireTime
        if (expired) {
          initializing = false
         
    removeExecutor(executorId)
        }
        !expired
      }
    }

    内部方法会被周期性的触发scheduler,周期性执行。

    保持executorId,不断注册executor。

    /**
     * Register for scheduler callbacks to decide when to add and remove executors, and start
     * the scheduling task.
     */
    def start(): Unit = {
      listenerBus.addListener(listener)

      val scheduleTask = new Runnable() {
        override def run(): Unit = {
          try {
            schedule()
          } catch {
            case ct: ControlThrowable =>
              throw ct
            case t: Throwable =>
              logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
          }
        }
      }
      executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
    }

    从调整周期角度,batchDuration角度来调整,10秒钟,是增加executor或减少executor,需对数据规模评估,具有资源评估,对已有资源闲置做评估;例如是否决定需要更多的资源,数据在batchDuration流进来就会有数据分片,每个数据分片处理的时候需要跟多的cores,如果不够就需要申请跟多的executors。

    Ss提供弹性机制,看下溜进来的速度和处理速度关系,是否来得及处理,来不及处理的话会动态控制数据流入的速度,这里有个控制速率的参数:ss。backpressuareenable参数。

    Spark Streaming本身有对rateController控制,在运行时手动控制流入的速度。如果delay,则控制速度,流入慢点,需要调整流入的数据和处理的时间比例关系。

    感谢王家林老师的知识分享

    Spark Streaming发行版笔记17

    新浪微博:http://weibo.com/ilovepains

    微信公众号:DT_Spark

    博客:http://blog.sina.com.cn/ilovepains

    手机:18610086859

    QQ:1740415547

    邮箱:18610086859@vip.126.com

  • 相关阅读:
    Delphi XE4 FireMonkey 开发 IOS APP 发布到 AppStore 最后一步.
    Native iOS Control Delphi XE4
    Delphi XE4 iAD Framework 支持.
    using IOS API with Delphi XE4
    GoF23种设计模式之行为型模式之命令模式
    Android青翼蝠王之ContentProvider
    Android白眉鹰王之BroadcastReceiver
    Android倚天剑之Notification之亮剑IOS
    Android紫衫龙王之Activity
    GoF23种设计模式之行为型模式之访问者模式
  • 原文地址:https://www.cnblogs.com/sparkbigdata/p/5544609.html
Copyright © 2011-2022 走看看