zoukankan      html  css  js  c++  java
  • Spark10-SparkStreaming

    一、SparkStreaming 介绍

    1、批量计算 和 流计算

    首先我们要知道什么是 SparkStreaming。先了解两个概念,批量计算流计算

    批量计算

     流计算

    两者区别:

    1.批量计算也叫做离线计算,数据是由边界的,无论多大,总之是有大小的

    2.流计算的数据是不断产生的

    3.批量计算往往计算全量数据

    4.流计算要求快速的处理,所以处理的是 增量数据

    2、流 和 批 的架构组合

    流和批都是有意义的,有自己的应用场景,那么如何结合流和批呢?如何在同一个系统中使用这两种不同的解决方案呢? (这块掐掉)

    3、SparkStreaming 的特点

    4、案例实现 

    我们通过在 socket 客户端 手动的持续输入数据,来验证流计算的运行特点。

    首先使用 nc 命令,使用 netcat 开启。

     目标:使用 Spark Streaming 程序和 Socket server 进行交互, 从 Server 处获取实时传输过来的字符串, 拆开单词并统计单词数量, 最后打印出来每一个小批次的单词数量

    package com.thorine.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamingWordCount {
      def main(args: Array[String]): Unit = {
        // 初始化环境
        // SparkConf 是在Spark core 中的内容,创建 SparkContext 的时候使用
        // 在创建 Streaming Context 的时候也要用到 conf, 说明 Spark Steaming 是基于 Spark core 的.
        // 接下来讲到的 (Structured Streaming),是基于 Spark SQL(DataFrame) 的
        // 在指定master的时候,不能指定一个线程,因为在 streaming 运行的时候,需要新开一个线程去一直监听数据的获取
        val conf = new SparkConf().setAppName("streaming word count").setMaster("local[2]")
    
        // StreamingContext 其实就是 Spark Streaming 的入口
        // 相当于 SparkContext 是 Spark Core 的入口一样
        val sc = new StreamingContext(conf, Seconds(1)) // 一秒钟一批
        sc.sparkContext.setLogLevel("WARN") // 设置日志等级
    
        // socketTextStream 这个方法用于去创建一个 DStream,用于监听 socket 输入,当作文本来处理
        // SparkContext 中, 有一个方法 textFile() 用来创建一个 RDD,而我们这里使用 socketTextStream 创建一个DStream
        // 这两个是类似的, 都是创建对应的数据集
        // RDD 来自于 Spark Core, DStream 来自于 Spark Streaming
        // DStream 可以理解为一个流式的 RDD
        val lines: ReceiverInputDStream[String] = sc.socketTextStream(
          hostname = "192.168.2.128",
          port = 9999,
          storageLevel = StorageLevel.MEMORY_AND_DISK_SER
        )
    
        val words = lines.flatMap( _.split(" "))
        val tuples = words.map((_,1))
        val counts = tuples.reduceByKey( _ + _ )
    
        counts.print()
    
        sc.start()
    
        // main 方法执行完后整个程序就会退出,所以需要阻塞主线程
        sc.awaitTermination()
      }
    }

    在 虚拟机的 socket server 中持续输入代词,可以看出 SparkStreaming得到的分词结果。

    注意:

    Spark Streaming 并不是真正的来一条数据处理一条

    Spark Streaming 的处理机制叫做小批量, 英文叫做 mini-batch, 是收集了一定时间的数据后生成 RDD, 后针对 RDD 进行各种转换操作, 这个原理提现在如下两个地方

    • 控制台中打印的结果是一个批次一个批次的, 统计单词数量也是按照一个批次一个批次的统计
    • 多长时间生成一个 RDD 去统计呢? 由 new StreamingContext(sparkConf, Seconds(1)) 这段代码中的第二个参数指定批次生成的时间

    Spark Streaming 中至少要有两个线程

    在使用 spark-submit 启动程序的时候, 不能指定一个线程

    • 主线程被阻塞了, 等待程序运行
    • 需要开启后台线程获取数据

     Streaming 里的算子使用

    这些算子类似 RDD, 也会生成新的 DStream

    这些算子操作最终会落到每一个 DStream 生成的 RDD 中

    算子释义

    flatMap

    lines.flatMap(_.split(" "))

    将一个数据一对多的转换为另外的形式, 规则通过传入函数指定

    map

    words.map(x => (x, 1))

    一对一的转换数据

    reduceByKey

    words.reduceByKey(_ + _)

    这个算子需要特别注意, 这个聚合并不是针对于整个流, 而是针对于某个批次的数据

    对于DStream,离散的,其中存放的并不是数据,并不是数据在流动而是 RDD在流动

    因为在 dstream 上调用map, flatmap, reduceByKey这些算子的时候其实是在每一个RDD中调用的所以只能统计当前 RDD所代表的批次的结果

    扩展:

    1、现在已经有更好的解决方法来完成 SparkStreaming 的工作,这就是 Structured Streaming,感兴趣可以选择了解。

    2、RDD,DataFrame,Dataset的进化史。

    编程模型解释

    RDD

    rdd.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .collect
    • 针对自定义数据对象进行处理, 可以处理任意类型的对象, 比较符合面向对象

    • RDD 无法感知到数据的结构, 无法针对数据结构进行编程

    DataFrame

    spark.read .csv("...") .where($"name" =!= "") .groupBy($"name") .show()
    • DataFrame 保留有数据的元信息, API 针对数据的结构进行处理, 例如说可以根据数据的某一列进行排序或者分组

    • DataFrame 在执行的时候会经过 Catalyst 进行优化, 并且序列化更加高效, 性能会更好

    • DataFrame 只能处理结构化的数据, 无法处理非结构化的数据, 因为 DataFrame 的内部使用 Row 对象保存数据

    • Spark 为 DataFrame 设计了新的数据读写框架, 更加强大, 支持的数据源众多

    Dataset

    spark.read .csv("...") .as[Person] .where(_.name != "") .groupByKey(_.name) .count() .show()
    • Dataset 结合了 RDD 和 DataFrame 的特点, 从 API 上即可以处理结构化数据, 也可以处理非结构化数据

    • Dataset 和 DataFrame 其实是一个东西, 所以 DataFrame 的性能优势, 在 Dataset 上也有

  • 相关阅读:
    CF833B The Bakery (线段树+DP)
    NOIP 2017 时间复杂度 (模拟)
    NOI 2018 屠龙勇士 (拓展中国剩余定理excrt+拓展欧几里得exgcd)
    中国剩余定理(excrt) 模板
    后缀自动机 模板
    luogu P4248 [AHOI2013]差异
    luogu P3975 [TJOI2015]弦论
    luogu P4770 [NOI2018]你的名字
    luogu P3726 [AH2017/HNOI2017]抛硬币
    luogu P3722 [AH2017/HNOI2017]影魔
  • 原文地址:https://www.cnblogs.com/dongao/p/14408587.html
Copyright © 2011-2022 走看看