zoukankan      html  css  js  c++  java
  • Spark Streaming性能优化系列-怎样获得和持续使用足够的集群计算资源?

    一:数据峰值的巨大影响
    1. 数据确实不稳定,比如晚上的时候訪问流量特别大
    2. 在处理的时候比如GC的时候耽误时间会产生delay延迟

    二:Backpressure:数据的反压机制
    基本思想:依据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度。
    怎样限制Spark接收数据的速度?
    Spark Streaming在接收数据的时候必须把当前的数据接收完毕才干接收下一条数据。

    源代码解析
    RateController:
    1. RateController是监听器。继承自StreamingListener.

    /**
     * A StreamingListener that receives batch completion updates, and maintains
     * an estimate of the speed at which this stream should ingest messages,
     * given an estimate computation from a `RateEstimator`
     */
    private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
        extends StreamingListener with Serializable {
    

    问题来了。RateContoller什么时候被调用的呢?
    BackPressure是依据上一次计算的Job信息来评估下一个Job数据接收的速度。

    因此肯定是在JobScheduler中被调用的。
    1. 在JobScheduler的start方法中rateController方法是从inputStream中获取的。

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)
    
    2.  然后将此消息增加到listenerBus中。
    
    /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
      * receiving system events related to streaming.
      */
    def addStreamingListener(streamingListener: StreamingListener) {
      scheduler.listenerBus.addListener(streamingListener)
    }
    

    }
    3. 在StreamingListenerBus源代码例如以下:

    /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    private[spark] class StreamingListenerBus
      extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
      with Logging {
    
      private val logDroppedEvent = new AtomicBoolean(false)
    
      override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
        event match {
          case receiverStarted: StreamingListenerReceiverStarted =>
            listener.onReceiverStarted(receiverStarted)
          case receiverError: StreamingListenerReceiverError =>
            listener.onReceiverError(receiverError)
          case receiverStopped: StreamingListenerReceiverStopped =>
            listener.onReceiverStopped(receiverStopped)
          case batchSubmitted: StreamingListenerBatchSubmitted =>
            listener.onBatchSubmitted(batchSubmitted)
          case batchStarted: StreamingListenerBatchStarted =>
            listener.onBatchStarted(batchStarted)
          case batchCompleted: StreamingListenerBatchCompleted =>
            listener.onBatchCompleted(batchCompleted)
    
    4.  在RateController就实现了onBatchCompleted
    

    这里写图片描写叙述
    5. RateController中onBatchCompleted详细实现例如以下:

    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
      val elements = batchCompleted.batchInfo.streamIdToInputInfo
    
      for {
        processingEnd <- batchCompleted.batchInfo.processingEndTime
        workDelay <- batchCompleted.batchInfo.processingDelay
        waitDelay <- batchCompleted.batchInfo.schedulingDelay
        elems <- elements.get(streamUID).map(_.numRecords)
      } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
    }
    
    6.  RateController中computeAndPulish源代码例如以下:
    
    /**
     * Compute the new rate limit and publish it asynchronously.
     */
    private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
      Future[Unit] {
    //评估新的更加合适Rate速度。

    val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } }

    7.  当中publish实现是在ReceiverRateController中。
    

    这里写图片描写叙述
    8. 将pulish消息给ReceiverTracker.

    /**
     * A RateController that sends the new rate to receivers, via the receiver tracker.
     */
    private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
        extends RateController(id, estimator) {
      override def publish(rate: Long): Unit =
    //由于会有非常多RateController所以会有详细Id
        ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
    }
    
    9.  在ReceiverTracker中sendRateUpdate源代码例如以下:
    此时的endpoint是ReceiverTrackerEndpoint. 
    
    /** Update a receiver's maximum ingestion rate */
    def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
      if (isTrackerStarted) {
        endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
      }
    }
    
    10. 在ReceiverTrackerEndpoint的receive方法中就接收到了发来的消息。
    
    case UpdateReceiverRateLimit(streamUID, newRate) =>
    //依据receiverTrackingInfos获取info信息,然后依据endpoint获取通信句柄。

    //此时endpoint是ReceiverSupervisor的endpoint通信实体。 for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) }

    11. 因此在ReceiverSupervisorImpl中接收到ReceiverTracker发来的消息。

    /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
    private val endpoint = env.rpcEnv.setupEndpoint(
      "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
        override val rpcEnv: RpcEnv = env.rpcEnv
    
        override def receive: PartialFunction[Any, Unit] = {
          case StopReceiver =>
            logInfo("Received stop signal")
            ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
          case CleanupOldBlocks(threshTime) =>
            logDebug("Received delete old batch signal")
            cleanupOldBlocks(threshTime)
          case UpdateRateLimit(eps) =>
            logInfo(s"Received a new rate limit: $eps.")
            registeredBlockGenerators.foreach { bg =>
              bg.updateRate(eps)
            }
        }
      })
    
    12. RateLimiter中updateRate源代码例如以下:
    
    /**
     * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
    //这里有最大限制,由于你的集群处理规模是有限的。
    //Spark Streaming可能执行在YARN之上。由于多个计算框架都在执行的话。资源就//更有限了。
     * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
     *
     * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
     */
    private[receiver] def updateRate(newRate: Long): Unit =
      if (newRate > 0) {
        if (maxRateLimit > 0) {
          rateLimiter.setRate(newRate.min(maxRateLimit))
        } else {
          rateLimiter.setRate(newRate)
        }
      }
    

    整体流程图例如以下:
    这里写图片描写叙述

    总结:
    每次上一个Batch Duration的Job执行完毕之后。都会返回JobCompleted等信息,基于这些信息产生一个新的Rate,然后将新的Rate通过远程通信交给了Executor中,而Executor也会依据Rate又一次设置Rate大小。

  • 相关阅读:
    [LOJ#6068]. 「2017 山东一轮集训 Day4」棋盘[费用流]
    [BZOJ4842]Delight for a Cat[费用流]
    [HNOI2018]转盘[结论+线段树]
    [LOJ#6066]. 「2017 山东一轮集训 Day3」第二题[二分+括号序列+hash]
    [CF963E]Circles of Waiting[高斯消元网格图优化+期望]
    [CF966F]May Holidays[分块+虚树]
    【JZOJ5088】【GDOI2017第四轮模拟day2】最小边权和 排序+动态规划
    【JZOJ5086】【GDOI2017第四轮模拟day1】数列 折半搜索
    GDOI2017第四轮day1总结
    【51nod1563】坐标轴上的最大团 贪心
  • 原文地址:https://www.cnblogs.com/llguanli/p/8579334.html
Copyright © 2011-2022 走看看