zoukankan      html  css  js  c++  java
  • Spark学习--Structured Streaming

    Structured Streaming

    Structured StreamingSpark Streaming 的进化版

    Spark 编程模型的进化过程

    总结

    RDD 的优点

    1. 面向对象的操作方式

    2. 可以处理任何类型的数据

    RDD 的缺点

    1. 运行速度比较慢, 执行过程没有优化

    2. API 比较僵硬, 对结构化数据的访问和操作没有优化

    DataFrame 的优点

    1. 针对结构化数据高度优化, 可以通过列名访问和转换数据

    2. 增加 Catalyst 优化器, 执行过程是优化的, 避免了因为开发者的原因影响效率

    DataFrame 的缺点

    1. 只能操作结构化数据

    2. 只有无类型的 API, 也就是只能针对列和 SQL 操作数据, API 依然僵硬

    Dataset 的优点

    1. 结合了 RDDDataFrameAPI, 既可以操作结构化数据, 也可以操作非结构化数据

    2. 既有有类型的 API 也有无类型的 API, 灵活选择

    Spark 的 序列化 的进化过程

    总结

    1. 当需要将对象缓存下来的时候, 或者在网络中传输的时候, 要把对象转成二进制, 在使用的时候再将二进制转为对象, 这个过程叫做序列化和反序列化

    2. Spark 中有很多场景需要存储对象, 或者在网络中传输对象

      1. Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执行

      2. 缓存 RDD 的时候, 需要保存 RDD 中的数据

      3. 广播变量的时候, 需要将变量序列化, 在集群中广播

      4. RDDShuffle 过程中 MapReducer 之间需要交换数据

      5. 算子中如果引入了外部的变量, 这个外部的变量也需要被序列化

    3. RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器

    4. DatasetDataFrame 中保留数据的元信息, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

    Spark Streaming和Structured Streaming区别

    • Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步

    • Structured Streaming 已经支持了连续流模型, 也就是类似于 Flink 那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式

    Structured Streaming 案例

    需求

    • 编写一个流式计算的应用, 不断的接收外部系统的消息

    • 对消息中的单词进行词频统计

    • 统计全局的结果

    具体实现

    1、开启Socket server并运行:nc -lk 9999 然后输入数据 

    2、运行代码

    package sparkStreaming
    
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    object SocketWordCount {
    
      def main(args: Array[String]): Unit = {
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("socket_structured")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        import spark.implicits._
    
        // 2. 数据集的生成, 数据读取
        val source: DataFrame = spark.readStream
          .format("socket")
          .option("host", "192.168.47.100")
          .option("port", 9999)
          .load()
    
        val sourceDS: Dataset[String] = source.as[String]
    
        // 3. 数据的处理
        val words = sourceDS.flatMap(_.split(" "))
          .map((_, 1))
          .groupByKey(_._1)
          .count()
    
        // 4. 结果集的生成和输出
        words.writeStream
          .outputMode(OutputMode.Complete())
          .format("console")
          .start()
          .awaitTermination()
      }
    }

    3、结果集

    从结果集中可以观察到以下内容

    • Structured Streaming 依然是小批量的流处理

    • Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的

    • 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样

     总结

    • Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地

    • Structured Streaming 中的编程模型依然是 DataFrameDataset

    • Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStreamwriteStream

    • Structured StreamingSparkSQL 几乎没有区别, 唯一的区别是, readStream 读出来的是流, writeStream 是将流输出, 而 SparkSQL 中的批处理使用 readwrite

    Dataset和流式计算

    可以理解为 Spark 中的 Dataset 有两种, 一种是处理静态批量数据的 Dataset, 一种是处理动态实时流的 Dataset, 这两种 Dataset 之间的区别如下

    • 流式的 Dataset 使用 readStream 读取外部数据源创建, 使用 writeStream 写入外部存储

    • 批式的 Dataset 使用 read 读取外部数据源创建, 使用 write 写入外部存储

    从 HDFS 中读取数据

    案例流程

    1. 利用py产生文件源源不断向hdfs上传文件
    2. 编写 Structured Streaming 程序处理数据

    python代码以及spark处理流式代码

    import os
    
    for index in range(100):
        content = """
        {"name": "Michael"}
        {"name": "Andy", "age": 30}
        {"name": "Justin", "age": 19}
        """
    
        file_name = "/data/spark/test/text{0}.json".format(index)
    
        with open(file_name, "w") as file:
            file.write(content)
    
        os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -mkdir -p /spark/dataset/")
        os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -put {0} /spark/dataset/".format(file_name))
    package sparkStreaming
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types.StructType
    
    object HDFSSource {
    
      def main(args: Array[String]): Unit = {
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .appName("hdfs_source")
          .master("local[6]")
          .getOrCreate()
    
        // 2. 数据读取, 目录只能是文件夹, 不能是某一个文件
        val schema = new StructType()
          .add("name", "string")
          .add("age", "integer")
    
        val source = spark.readStream
          .schema(schema)
          .json("hdfs://node01:8020/spark/dataset")
    
        // 3. 输出结果
        source.writeStream
          .outputMode(OutputMode.Append())
          .format("console")
          .start()
          .awaitTermination()
      }
    }

    总结

    1. Python 生成文件到 HDFS, 这一步在真实环境下, 可能是由 FlumeSqoop 收集并上传至 HDFS

    2. Structured StreamingHDFS 中读取数据并处理

    3. Structured Streaming 讲结果表展示在控制台

  • 相关阅读:
    Task 4.5 求二维数组中的最大连通子数组之和
    Task 6.4 冲刺Two之站立会议4
    《程序员开发心理学》阅读笔记二
    第二次站立会议9
    第二次站立会议8
    第二次站立会议7
    第二次站立会议6
    第二次站立会议5
    第二次站立会议4
    第二次站立会议3
  • 原文地址:https://www.cnblogs.com/MoooJL/p/14290130.html
Copyright © 2011-2022 走看看