zoukankan      html  css  js  c++  java
  • spark streaming 5: InputDStream

    InputDStream的继承关系。他们都是使用InputDStream这个抽象类的接口进行操作的。特别注意ReceiverInputDStream这个类,大部分时候我们使用的是它作为扩展的基类,因为它才能(更容易)使接收数据的工作分散到各个worker上执行,更符合分布式计算的理念。
    所有的输入流都某个时间间隔将数据以block的形式保存到spark memory中,但以spark core不同的是,spark streaming默认是将对象序列化后保存到内存中。


    /**
    * This is the abstract base class for all input streams. This class provides methods
    * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
    * Input streams that can generate RDDs from new data by running a service/thread only on
    * the driver node (that is, without running a receiver on worker nodes), can be
    * implemented by directly inheriting this InputDStream.
    For example,
    * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
    * new files and generates RDDs with the new files. For implementing input streams
    * that requires running a receiver on the worker nodes, use
    * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class
    .
    *
    * @param ssc_ Streaming context that will execute this input stream
    */
    abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
    extends DStream[T](ssc_) {

    private[streaming] var lastValidTime: Time = null

    ssc.graph.addInputStream(this)

    /**
    * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
    * that has to start a receiver on worker nodes to receive external data.
    * Specific implementations of NetworkInputDStream must
    * define `the getReceiver()` function that gets the receiver object of type
    * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
    * to the workers to receive data.
    * @param ssc_ Streaming context that will execute this input stream
    * @tparam T Class type of the object of this stream
    */
    abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
    extends InputDStream[T](ssc_) {

    /** Keeps all received blocks information */
    private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

    /** This is an unique identifier for the network input stream. */
    val id = ssc.getNewReceiverStreamId()

    /**
    * Gets the receiver object that will be sent to the worker nodes
    * to receive data. This method needs to defined by any specific implementation
    * of a NetworkInputDStream.
    */
    def getReceiver(): Receiver[T]
    最终都是以BlockRDD返回的
    /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
    override def compute(validTime: Time): Option[RDD[T]] = {
    // If this is called for any time before the start time of the context,
    // then this returns an empty RDD. This may happen when recovering from a
    // master failure
    if (validTime >= graph.startTime) {
    val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    receivedBlockInfo(validTime) = blockInfo
    val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
    Some(new BlockRDD[T](ssc.sc, blockIds))
    } else {
    Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
    }
    }




































  • 相关阅读:
    h5 input调起摄像头、摄像机、录音机
    基数排序
    快速排序 && 希尔排序 && 插入排序
    堆排序
    归并排序(Merge sort)
    动态规划:背包问题
    Chap5: question: 29
    排列 && 组合
    Chap4: question: 19
    Chap3: question: 11
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4275348.html
Copyright © 2011-2022 走看看