zoukankan      html  css  js  c++  java
  • [Spark]-Streaming-输入

    1.Input DStreams 和 Receivers

      Spark Streaming的输入由两个部分组成 Input DStream 和 Receiver

      Input DStream  代表的是从数据源接收到由输入数据组成的数据流

      Receiver 是从数据源获取数据并写入Spark内存的实际执行者.每一个Input DStream都会与一个Receiver 关联.

      如果一个Spark Streaming 应用需要并行的接收多个数据流.可以创建多个Input DStream(这将同时创建多个Receiver)

        需要注意的是,在Spark Streaming应用中,executor运行的将是一个长期任务.因为每一个executor都将长期占用某一个核(文件流某些情况除外)

        .因此,Spark Streaming应用必须保证有足够多的核来运行.

          这里的足够多核是指: 可用核必须大于Receiver数.因为计算本身也需要一个executor.否则系统只能正确接收数据,但无法实际处理数据.

          比如说:本地模式 "local/local[1]",唯一的核被交给Receiver 而没有执行计算,所以是无法得到计算结果的.

    2.内置数据源

      2.1 File Streams

         从文件系统读取数据转化为DStream.(这里的文件系统包括本地文件系统,或是HDFS,S3等任何Hadoop支持的分布式文件系统)

         File Stream 有以下要点:

          i).不支持嵌套目录.换句话说,只有直接处于目标目录下的文件才能被监控读取.

          ii).目录支持通配符.在这种情况下,Spark Streaming将对符合条件的目录列表进行监控

          iii).目录下的所有文件必须有统一的格式

          iv).目录下的某一个文件,视为某个时间范围数据的一部分.这个时间范围的判定标准是文件修改时间而不是创建时间

          v).文件在窗口一次执行中仅读取一次.在这种情况,文件的后续写入无效.(2.3新特性,历史是文件任何情况下都不允许修改)

          vi).目录下的文件越长,扫描一次修改时间所花费的时间就越多.哪怕文件其实并没有任何修改

          vii).只有修改时间落在某个窗口范围内,才会被某个窗口拉取.(依次,可以使用 FileSystem.setTimes() 设置文件修改时间来让文件落入到之后的某个窗口中)

                  viii).注意不同系统下的文件修改时间策略

            比如HDFS,文件修改时间是在写入设置的.这样当某个窗口读取时,可能文件还没有写入完毕而继续保持打开状态,从而导致读取失败

            或者S3系统,文件修改时间是在写入完毕之后拷贝写入.这样可能导致数据本来应该落入某个窗口而无法落入.

            总之,用户需要仔细理清目标环境下的文件修改时间策略,让Spark能以用户期望的行为来运行.

       文件流读取时,如果是一个简单文件读取(streamingContext.textFileStream(dataDirectory),将不会单独占用一个核<=开发环境中,生产很难有如此简单文件读取的情况

      2.2 Socket 

        ssc.socketTextStream("127.0.0.1", 9999)

        一般开发环境中使用

      2.3 高级数据源

        Spark Streaming 已经内置一些诸如 Kafka, Kinesis 和 Flume 等高级数据源,这些后面专章介绍.

    3.自定义数据源

      3.1 数据源的类型

        Spark Streaming,依据收到数据后,是否需要回发数据源ack消息(确认收到数据),将数据源分为两大类 不可靠数据源和可靠数据源

      3.2 不可靠数据源

        继承实现 Receiver[T],并需要指定数据存储级别,和实现 onStart(): Unit 和 onStop(): Unit 两个方法.并在其中内置了store和 restart两个方法

        一个不可靠数据源Demo如下   

                /**
                * 自定义Socket数据源
                * 存储级别为: StorageLevel.MEMORY_AND_DISK_2
                *   MEMORY_AND_DISK 表示保存直内存,但允许溢出时到磁盘  _2 表示存储的副本数
                *   
                * 使用方法: ssc.receiverStream(new CusReceiver("127.0.0.1", 9999))
                */
                class CusReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
                /**
                    * 接收器启动时调用onStart方法,用于初始化接收数据所需要的所有资源
                    * 注意:
                    *   onStart必须保持非阻塞的方式执行,因为接收器运行是在不同线程中(线程切换)
                    */
                override def onStart(): Unit = {
                    new Thread("Socket Receiver") {
                    override def run() {
                        receive()
                    }
                    }.start()
                }
                
                /**
                    * 当接收器关闭时调用 onStop(),用于清理资源等
                    */
                override def onStop(): Unit = {}
                
                private def receive(): Unit = {
                    var socket: Socket = null
                    var userInput: String = null
                    try {
                    socket = new Socket(host, port);
                    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
                    userInput = reader.readLine()
                
                    while (!isStopped && userInput != null) {
                        /**
                        * 将读取的数据保存至内存中
                        *   这里将应用 Receiver构造设置的存储级别来进行保存
                        */
                        store(userInput)
                        userInput = reader.readLine()
                    }
                    reader.close()
                    socket.close()
                
                    /**
                        * Spark Streaming的接收器重启(这是一个Future模式的异步操作)
                        *   这将立即关闭接收器并调用onStop,并在之后的某个时间启动接收器(调用onStart)
                        *
                        * 从源码可以看出,receiver是长期占用某个核.哪怕重启也是以线程休眠的形式,并不会释放核
                        * 具体过程如下:
                        *   1.记录重启原因日志
                        *   2.停止当前接收器(调用onStop)
                        *   3.线程休眠(Spark .stream . receiverrestartdelay)毫秒
                        *   4.启动当前接收器(调用onStart)
                        */
                    restart("Trying to connect again")
                    } catch {
                    case e: java.net.ConnectException =>
                        restart("Error connecting to " + host + ":" + port, e)
                    case t: Throwable =>
                        restart("Error receiving data", t)
                    }
                }
                }

        

      3.3 可靠数据源

        一个不可靠数据源只是单纯的拉取数据,然后调用store保存而无需考虑其它的逻辑.但如果是可靠数据源,则必须要考虑以下:

          强容错的保证,保证零数据丢失

          接收放实现控制块生成和处理接收速率.

          实现目标数据源的Ack机制

        未完,demo后补

  • 相关阅读:
    【问题记录】ajax dataType属性
    【问题记录】springMVC @Valid使用不生效问题
    Initialization of bean failed; nested exception is java.lang.NoClassDefFoundError: javax/jms/JMSContext
    mysql优化:explain 和 profile
    【问题记录】mysql TIMEDIFF 和 TIMESTAMPDIFF的使用
    初次搭建spring boot 项目(实验楼-学习笔记)
    JqGrid自定义toolbar
    MS SQL SERVER 2008 R2 实例服务启动出现10048错误解决办法
    C#快速导入海量XML数据至SQL Server数据库
    SQL2012之FileTable与C#的联合应用
  • 原文地址:https://www.cnblogs.com/NightPxy/p/9292063.html
Copyright © 2011-2022 走看看