zoukankan      html  css  js  c++  java
  • spark streaming 6: BlockGenerator、RateLimiter

    BlockGenerator和RateLimiter其实很简单,但是它包含了几个很重要的属性配置的处理,所以记录一下。
    /**
    * Generates batches of objects received by a
    * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
    * named blocks at regular intervals. This class starts two threads,
    * one to periodically start a new batch and prepare the previous batch of as a block,
    * the other to push the blocks into the block manager.
    */
    private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener
    ,
    receiverId: Int,
    conf: SparkConf
    )
    extends RateLimiter(conf) with Logging {

    private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

    private val clock = new SystemClock()
    private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
    private val blockIntervalTimer =
    new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
    private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
    private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
    private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

    @volatile private var currentBuffer = new ArrayBuffer[Any]
    @volatile private var stopped = false

    /** Provides waitToPush() method to limit the rate at which receivers consume data.
    *
    * waitToPush method will block the thread if too many messages have been pushed too quickly,
    * and only return when a new message has been pushed. It assumes that only one message is
    * pushed at a time.
    *
    * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
    * per second that each receiver will accept.
    *
    * @param conf spark configuration
    */
    private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

    private var lastSyncTime = System.nanoTime
    private var messagesWrittenSinceSync = 0L
    private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
    private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
















  • 相关阅读:
    实例教学在MySQL中若何导出整个数据库
    在Linux下Turbomail简易快捷的装置方式
    Fedora下编译mitscheme
    Fedora 9可以不敌RedHat 9光辉
    实用伎俩:Ubuntu Linux 8.04设置与优化
    Linux下给WordPress建设伪静态
    红旗桌面版本最新使用要领和题目问题解答100例5
    知识管理系统红旗Linux/KM.Center
    python 虚拟环境的安装
    python 闭包
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4275464.html
Copyright © 2011-2022 走看看