zoukankan      html  css  js  c++  java
  • spark streaming 6: BlockGenerator、RateLimiter

    BlockGenerator和RateLimiter其实很简单,但是它包含了几个很重要的属性配置的处理,所以记录一下。
    /**
    * Generates batches of objects received by a
    * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
    * named blocks at regular intervals. This class starts two threads,
    * one to periodically start a new batch and prepare the previous batch of as a block,
    * the other to push the blocks into the block manager.
    */
    private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener
    ,
    receiverId: Int,
    conf: SparkConf
    )
    extends RateLimiter(conf) with Logging {

    private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

    private val clock = new SystemClock()
    private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
    private val blockIntervalTimer =
    new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
    private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
    private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
    private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

    @volatile private var currentBuffer = new ArrayBuffer[Any]
    @volatile private var stopped = false

    /** Provides waitToPush() method to limit the rate at which receivers consume data.
    *
    * waitToPush method will block the thread if too many messages have been pushed too quickly,
    * and only return when a new message has been pushed. It assumes that only one message is
    * pushed at a time.
    *
    * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
    * per second that each receiver will accept.
    *
    * @param conf spark configuration
    */
    private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

    private var lastSyncTime = System.nanoTime
    private var messagesWrittenSinceSync = 0L
    private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
    private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
















  • 相关阅读:
    数和量的认识
    判断Exe(DLL)和符号文件是否匹配---验证模块和符号文件是否匹配的工具和方法
    CPU怎么计算1+1----CPU计算的电路基础
    java8-StreamAPI之collection归约操作
    java8-Stream流API
    java8-从Lamda到方法引用和构造引用
    java8-详解Lamda表达式
    Java8-Lamda和Stream原理引入
    jdk13-新特性预览
    mybatis配置
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4275464.html
Copyright © 2011-2022 走看看