zoukankan      html  css  js  c++  java
  • Spark学习进度11-Spark Streaming&Structured Streaming

    Spark Streaming

    Spark Streaming 介绍

    批量计算

     流计算

    Spark Streaming 入门

     Netcat 的使用

     项目实例

    目标:使用 Spark Streaming 程序和 Socket server 进行交互, 从 Server 处获取实时传输过来的字符串, 拆开单词并统计单词数量, 最后打印出来每一个小批次的单词数量

     步骤: 

    package cn.itcast.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamingWordCount {
    
      def main(args: Array[String]): Unit = {
        //1.初始化
        val sparkConf=new SparkConf().setAppName("streaming").setMaster("local[2]")
        val ssc=new StreamingContext(sparkConf,Seconds(5))
        ssc.sparkContext.setLogLevel("WARN")
    
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
          hostname = "192.168.31.101",
          port = 9999,
          storageLevel = StorageLevel.MEMORY_AND_DISK_SER
        )
        //2.数据处理
        //2.1把句子拆单词
        val words: DStream[String] =lines.flatMap(_.split(" "))
        val tuples: DStream[(String, Int)] =words.map((_,1))
        val counts: DStream[(String, Int)] =tuples.reduceByKey(_+_)
    
        //3.展示
        counts.print()
    
        ssc.start()
    
        ssc.awaitTermination()
    
    
      }
    
    }

    开始进行交互:

    注意:

    Spark Streaming 并不是真正的来一条数据处理一条

    Spark Streaming 的处理机制叫做小批量, 英文叫做 mini-batch, 是收集了一定时间的数据后生成 RDD, 后针对 RDD 进行各种转换操作, 这个原理提现在如下两个地方

    • 控制台中打印的结果是一个批次一个批次的, 统计单词数量也是按照一个批次一个批次的统计
    • 多长时间生成一个 RDD 去统计呢? 由 new StreamingContext(sparkConf, Seconds(1)) 这段代码中的第二个参数指定批次生成的时间

    Spark Streaming 中至少要有两个线程

    在使用 spark-submit 启动程序的时候, 不能指定一个线程

    • 主线程被阻塞了, 等待程序运行
    • 需要开启后台线程获取数据

    各种算子

     

    • 这些算子类似 RDD, 也会生成新的 DStream

    • 这些算子操作最终会落到每一个 DStream 生成的 RDD 中

    算子释义

    flatMap

    lines.flatMap(_.split(" "))

    将一个数据一对多的转换为另外的形式, 规则通过传入函数指定

    map

    words.map(x => (x, 1))

    一对一的转换数据

    reduceByKey

    words.reduceByKey(_ + _)

    这个算子需要特别注意, 这个聚合并不是针对于整个流, 而是针对于某个批次的数据

    Structured Streaming

    Spark 编程模型的进化过程

    编程模型解释

    RDD

    rdd.flatMap(_.split(" "))
       .map((_, 1))
       .reduceByKey(_ + _)
       .collect
    • 针对自定义数据对象进行处理, 可以处理任意类型的对象, 比较符合面向对象

    • RDD 无法感知到数据的结构, 无法针对数据结构进行编程

    DataFrame

    spark.read
         .csv("...")
         .where($"name" =!= "")
         .groupBy($"name")
         .show()
    • DataFrame 保留有数据的元信息, API 针对数据的结构进行处理, 例如说可以根据数据的某一列进行排序或者分组

    • DataFrame 在执行的时候会经过 Catalyst 进行优化, 并且序列化更加高效, 性能会更好

    • DataFrame 只能处理结构化的数据, 无法处理非结构化的数据, 因为 DataFrame 的内部使用 Row 对象保存数据

    • Spark 为 DataFrame 设计了新的数据读写框架, 更加强大, 支持的数据源众多

    Dataset

    spark.read
         .csv("...")
         .as[Person]
         .where(_.name != "")
         .groupByKey(_.name)
         .count()
         .show()
    • Dataset 结合了 RDD 和 DataFrame 的特点, 从 API 上即可以处理结构化数据, 也可以处理非结构化数据

    • Dataset 和 DataFrame 其实是一个东西, 所以 DataFrame 的性能优势, 在 Dataset 上也有

    Spark Streaming 和 Structured Streaming

    Spark Streaming 时代

    20190628010204
    • Spark Streaming 其实就是 RDD 的 API 的流式工具, 其本质还是 RDD, 存储和执行过程依然类似 RDD

    Structured Streaming 时代

    20190628010542
    • Structured Streaming 其实就是 Dataset 的 API 的流式工具, API 和 Dataset 保持高度一致

    Spark Streaming 和 Structured Streaming

    • Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步

    • 另外还有一点, Structured Streaming 已经支持了连续流模型, 也就是类似于 Flink 那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式

    在 2.2.0 以后 Structured Streaming 被标注为稳定版本, 意味着以后的 Spark 流式开发不应该在采用 Spark Streaming 了

    Structured Streaming 入门案例

    需求

    20190628144128
    • 编写一个流式计算的应用, 不断的接收外部系统的消息

    • 对消息中的单词进行词频统计

    • 统计全局的结果

    步骤:

    package cn.itcast.structured
    
    
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    object SocketWordCount {
    
      def main(args: Array[String]): Unit = {
    
        //1.创建SparkSession
        val spark=SparkSession.builder().master("local[5]")
          .appName("structured")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        import spark.implicits._
    
        //2.数据集的生成,数据读取
        val source: DataFrame =spark.readStream
          .format("socket")
          .option("host","192.168.31.101")
          .option("port",9999)
          .load()
    
        val sourceDS: Dataset[String] = source.as[String]
    
        //3.数据的处理
        val words=sourceDS.flatMap(_.split(" "))
          .map((_,1))
          .groupByKey(_._1)
          .count()
        //4.结果集的生成和输出
        words.writeStream
          .outputMode(OutputMode.Complete())
          .format("console")
          .start()
          .awaitTermination()
    
    
      }
    
    }

    交互结果:

     

    从结果集中可以观察到以下内容

    • Structured Streaming 依然是小批量的流处理

    • Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的

    • 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样

    从 HDFS 中读取数据

    使用 Structured Streaming 整合 HDFS, 从其中读取数据的能力

    步骤

    1. 案例结构

    2. 产生小文件并推送到 HDFS

    3. 流式计算统计 HDFS 上的小文件

    4. 运行和总结

    实验步骤:

    Step1:利用py产生文件源源不断向hdfs上传文件

    Step2:编写 Structured Streaming 程序处理数据

    py代码:

    import os
    
    for index in range(100):
    
        content = """
        {"name": "Michael"}
        {"name": "Andy", "age": 30}
        {"name": "Justin", "age": 19}
        """
    
    
        file_name = "/export/dataset/text{0}.json".format(index)
    
    
        with open(file_name, "w") as file:
            file.write(content)
    
    
        os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -mkdir -p /dataset/dataset/")
        os.system("/export/servers/hadoop-2.7.5/bin/hdfs dfs -put {0} /dataset/dataset/".format(file_name))

    spark处理流式文件

    package cn.itcast.structured
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types.{StructField, StructType}
    
    object HDFSSource {
    
      def main(args: Array[String]): Unit = {
    
        System.setProperty("hadoop.home.dir","C:\winutil")
    
        //1.创建SparkSession
        val spark=SparkSession.builder()
          .appName("hdfs_source")
          .master("local[6]")
          .getOrCreate()
    
        //2.数据读取
        val schema=new StructType()
            .add("name","string")
            .add("age","integer")
        val source=spark.readStream
            .schema(schema)
          .json("hdfs://hadoop101:8020/dataset/dataset")
    
        //3.输出结果
        source.writeStream
          .outputMode(OutputMode.Append())
          .format("console")
          .start()
          .awaitTermination()
    
      }
    
    }

    总结

    20190715111534
    1. Python 生成文件到 HDFS, 这一步在真实环境下, 可能是由 Flume 和 Sqoop 收集并上传至 HDFS

    2. Structured Streaming 从 HDFS 中读取数据并处理

    3. Structured Streaming 讲结果表展示在控制台

  • 相关阅读:
    数据仓库
    HiveSQL 数据定义语言(DDL)
    HiveSQL 数据操控、查询语言(DML、DQL)
    【ASP.NET Core】Blazor+MiniAPI完成文件下载
    MySQL的WAL(WriteAhead Logging)机制
    MySQL系列 | 索引数据结构大全
    眼见为实,看看MySQL中的隐藏列
    mysql的默认隔离级别:可重复读(Repeatable Read)
    缓存淘汰算法LRU算法
    Android设计模式系列(12)SDK源码之生成器模式(建造者模式)
  • 原文地址:https://www.cnblogs.com/xiaofengzai/p/14284502.html
Copyright © 2011-2022 走看看