zoukankan      html  css  js  c++  java
  • Spark Streaming性能优化系列-怎样获得和持续使用足够的集群计算资源?

    一:数据峰值的巨大影响
    1. 数据确实不稳定,比如晚上的时候訪问流量特别大
    2. 在处理的时候比如GC的时候耽误时间会产生delay延迟

    二:Backpressure:数据的反压机制
    基本思想:依据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度。
    怎样限制Spark接收数据的速度?
    Spark Streaming在接收数据的时候必须把当前的数据接收完毕才干接收下一条数据。

    源代码解析
    RateController:
    1. RateController是监听器。继承自StreamingListener.

    /**
     * A StreamingListener that receives batch completion updates, and maintains
     * an estimate of the speed at which this stream should ingest messages,
     * given an estimate computation from a `RateEstimator`
     */
    private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
        extends StreamingListener with Serializable {
    

    问题来了。RateContoller什么时候被调用的呢?
    BackPressure是依据上一次计算的Job信息来评估下一个Job数据接收的速度。

    因此肯定是在JobScheduler中被调用的。
    1. 在JobScheduler的start方法中rateController方法是从inputStream中获取的。

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)
    
    2.  然后将此消息增加到listenerBus中。
    
    /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
      * receiving system events related to streaming.
      */
    def addStreamingListener(streamingListener: StreamingListener) {
      scheduler.listenerBus.addListener(streamingListener)
    }
    

    }
    3. 在StreamingListenerBus源代码例如以下:

    /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    private[spark] class StreamingListenerBus
      extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
      with Logging {
    
      private val logDroppedEvent = new AtomicBoolean(false)
    
      override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
        event match {
          case receiverStarted: StreamingListenerReceiverStarted =>
            listener.onReceiverStarted(receiverStarted)
          case receiverError: StreamingListenerReceiverError =>
            listener.onReceiverError(receiverError)
          case receiverStopped: StreamingListenerReceiverStopped =>
            listener.onReceiverStopped(receiverStopped)
          case batchSubmitted: StreamingListenerBatchSubmitted =>
            listener.onBatchSubmitted(batchSubmitted)
          case batchStarted: StreamingListenerBatchStarted =>
            listener.onBatchStarted(batchStarted)
          case batchCompleted: StreamingListenerBatchCompleted =>
            listener.onBatchCompleted(batchCompleted)
    
    4.  在RateController就实现了onBatchCompleted
    

    这里写图片描写叙述
    5. RateController中onBatchCompleted详细实现例如以下:

    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
      val elements = batchCompleted.batchInfo.streamIdToInputInfo
    
      for {
        processingEnd <- batchCompleted.batchInfo.processingEndTime
        workDelay <- batchCompleted.batchInfo.processingDelay
        waitDelay <- batchCompleted.batchInfo.schedulingDelay
        elems <- elements.get(streamUID).map(_.numRecords)
      } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
    }
    
    6.  RateController中computeAndPulish源代码例如以下:
    
    /**
     * Compute the new rate limit and publish it asynchronously.
     */
    private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
      Future[Unit] {
    //评估新的更加合适Rate速度。

    val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } }

    7.  当中publish实现是在ReceiverRateController中。
    

    这里写图片描写叙述
    8. 将pulish消息给ReceiverTracker.

    /**
     * A RateController that sends the new rate to receivers, via the receiver tracker.
     */
    private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
        extends RateController(id, estimator) {
      override def publish(rate: Long): Unit =
    //由于会有非常多RateController所以会有详细Id
        ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
    }
    
    9.  在ReceiverTracker中sendRateUpdate源代码例如以下:
    此时的endpoint是ReceiverTrackerEndpoint. 
    
    /** Update a receiver's maximum ingestion rate */
    def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
      if (isTrackerStarted) {
        endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
      }
    }
    
    10. 在ReceiverTrackerEndpoint的receive方法中就接收到了发来的消息。
    
    case UpdateReceiverRateLimit(streamUID, newRate) =>
    //依据receiverTrackingInfos获取info信息,然后依据endpoint获取通信句柄。

    //此时endpoint是ReceiverSupervisor的endpoint通信实体。 for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) }

    11. 因此在ReceiverSupervisorImpl中接收到ReceiverTracker发来的消息。

    /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
    private val endpoint = env.rpcEnv.setupEndpoint(
      "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
        override val rpcEnv: RpcEnv = env.rpcEnv
    
        override def receive: PartialFunction[Any, Unit] = {
          case StopReceiver =>
            logInfo("Received stop signal")
            ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
          case CleanupOldBlocks(threshTime) =>
            logDebug("Received delete old batch signal")
            cleanupOldBlocks(threshTime)
          case UpdateRateLimit(eps) =>
            logInfo(s"Received a new rate limit: $eps.")
            registeredBlockGenerators.foreach { bg =>
              bg.updateRate(eps)
            }
        }
      })
    
    12. RateLimiter中updateRate源代码例如以下:
    
    /**
     * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
    //这里有最大限制,由于你的集群处理规模是有限的。
    //Spark Streaming可能执行在YARN之上。由于多个计算框架都在执行的话。资源就//更有限了。
     * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
     *
     * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
     */
    private[receiver] def updateRate(newRate: Long): Unit =
      if (newRate > 0) {
        if (maxRateLimit > 0) {
          rateLimiter.setRate(newRate.min(maxRateLimit))
        } else {
          rateLimiter.setRate(newRate)
        }
      }
    

    整体流程图例如以下:
    这里写图片描写叙述

    总结:
    每次上一个Batch Duration的Job执行完毕之后。都会返回JobCompleted等信息,基于这些信息产生一个新的Rate,然后将新的Rate通过远程通信交给了Executor中,而Executor也会依据Rate又一次设置Rate大小。

  • 相关阅读:
    Leetcode 191.位1的个数 By Python
    反向传播的推导
    Leetcode 268.缺失数字 By Python
    Leetcode 326.3的幂 By Python
    Leetcode 28.实现strStr() By Python
    Leetcode 7.反转整数 By Python
    Leetcode 125.验证回文串 By Python
    Leetcode 1.两数之和 By Python
    Hdoj 1008.Elevator 题解
    TZOJ 车辆拥挤相互往里走
  • 原文地址:https://www.cnblogs.com/llguanli/p/8579334.html
Copyright © 2011-2022 走看看