zoukankan      html  css  js  c++  java
  • spark学习进度29(structuredStreaming)

    编程模型解释

    RDD

    rdd.flatMap(_.split(" "))
       .map((_, 1))
       .reduceByKey(_ + _)
       .collect
    • 针对自定义数据对象进行处理, 可以处理任意类型的对象, 比较符合面向对象

    • RDD 无法感知到数据的结构, 无法针对数据结构进行编程

    DataFrame

    spark.read
         .csv("...")
         .where($"name" =!= "")
         .groupBy($"name")
         .show()
    • DataFrame 保留有数据的元信息, API 针对数据的结构进行处理, 例如说可以根据数据的某一列进行排序或者分组

    • DataFrame 在执行的时候会经过 Catalyst 进行优化, 并且序列化更加高效, 性能会更好

    • DataFrame 只能处理结构化的数据, 无法处理非结构化的数据, 因为 DataFrame 的内部使用 Row 对象保存数据

    • Spark 为 DataFrame 设计了新的数据读写框架, 更加强大, 支持的数据源众多

    Dataset

    spark.read
         .csv("...")
         .as[Person]
         .where(_.name != "")
         .groupByKey(_.name)
         .count()
         .show()
    • Dataset 结合了 RDD 和 DataFrame 的特点, 从 API 上即可以处理结构化数据, 也可以处理非结构化数据

    • Dataset 和 DataFrame 其实是一个东西, 所以 DataFrame 的性能优势, 在 Dataset 上也有

    RDD 的优点

    1. 面向对象的操作方式

    2. 可以处理任何类型的数据

    RDD 的缺点

    1. 运行速度比较慢, 执行过程没有优化

    2. API 比较僵硬, 对结构化数据的访问和操作没有优化

    DataFrame 的优点

    1. 针对结构化数据高度优化, 可以通过列名访问和转换数据

    2. 增加 Catalyst 优化器, 执行过程是优化的, 避免了因为开发者的原因影响效率

    DataFrame 的缺点

    1. 只能操作结构化数据

    2. 只有无类型的 API, 也就是只能针对列和 SQL 操作数据, API 依然僵硬

    Dataset 的优点

    1. 结合了 RDD 和 DataFrame 的 API, 既可以操作结构化数据, 也可以操作非结构化数据

    2. 既有有类型的 API 也有无类型的 API, 灵活选择

     整体结构

    20190628131804
    1. Socket Server 等待 Structured Streaming 程序连接

    2. Structured Streaming 程序启动, 连接 Socket Server, 等待 Socket Server 发送数据

    3. Socket Server 发送数据, Structured Streaming 程序接收数据

    4. Structured Streaming 程序接收到数据后处理数据

    5. 数据处理后, 生成对应的结果集, 在控制台打印

     

     

     

     HDFS读数据:

     目标

    • 在数据处理的时候, 经常会遇到这样的场景

      20190630160310
    • 有时候也会遇到这样的场景

      20190630160448
    • 以上两种场景有两个共同的特点

      • 会产生大量小文件在 HDFS 上

      • 数据需要处理

    • 通过本章节的学习, 便能够更深刻的理解这种结构, 具有使用 Structured Streaming 整合 HDFS, 从其中读取数据的能力

    案例流程

    20190715111534
    1. 编写 Python 小程序, 在某个目录生成大量小文件

      • Python 是解释型语言, 其程序可以直接使用命令运行无需编译, 所以适合编写快速使用的程序, 很多时候也使用 Python 代替 Shell

      • 使用 Python 程序创建新的文件, 并且固定的生成一段 JSON 文本写入文件

      • 在真实的环境中, 数据也是一样的不断产生并且被放入 HDFS 中, 但是在真实场景下, 可能是 Flume 把小文件不断上传到 HDFS 中, 也可能是 Sqoop 增量更新不断在某个目录中上传小文件

    2. 使用 Structured Streaming 汇总数据

      • HDFS 中的数据是不断的产生的, 所以也是流式的数据

      • 数据集是 JSON 格式, 要有解析 JSON 的能力

      • 因为数据是重复的, 要对全局的流数据进行汇总和去重, 其实真实场景下的数据清洗大部分情况下也是要去重的

    3. 使用控制台展示数据

      • 最终的数据结果以表的形式呈现

      • 使用控制台展示数据意味着不需要在修改展示数据的代码, 将 Sink 部分的内容放在下一个大章节去说明

      • 真实的工作中, 可能数据是要落地到 MySQLHBaseHDFS 这样的存储系统中

    难点和易错点

    1. 在读取 HDFS 的文件时, Source 不仅对接数据源, 也负责反序列化数据源中传过来的数据

      • Source 可以从不同的数据源中读取数据, 如 KafkaHDFS

      • 数据源可能会传过来不同的数据格式, 如 JSONParquet

    2. 读取 HDFS 文件的这个 Source 叫做 FileStreamSource

      从命名就可以看出来这个 Source 不仅支持 HDFS, 还支持本地文件读取, 亚马逊云, 阿里云 等文件系统的读取, 例如: file://s3://oss://

    3. 基于流的 Dataset 操作和基于静态数据集的 Dataset 操作是一致的

    import os
    
    for index in range(100):
    
        content = """
        {"name": "Michael"}
        {"name": "Andy", "age": 30}
        {"name": "Justin", "age": 19}
        """
        file_name = "/export/dataset/text{0}.json".format(index)
    
        with open(file_name, "w") as file:
            file.write(content)
    
        os.system("/export/servers/hadoop/bin/hdfs dfs -mkdir -p /dataset/dataset/")
        os.system("/export/servers/hadoop/bin/hdfs dfs -put {0} /dataset/dataset/".format(file_name))

    执行python文件

    删除hdfs目录

    package cn.itcast.structured
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types.StructType
    
    object HDFSSource {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("hadoop.home.dir", "C:\winutil")
    
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .appName("hdfs_source")
          .master("local[6]")
          .getOrCreate()
    
        // 2. 数据读取, 目录只能是文件夹, 不能是某一个文件
        val schema = new StructType()
          .add("name", "string")
          .add("age", "integer")
    
        val source = spark.readStream
          .schema(schema)
          .json("hdfs://node01:8020/dataset/dataset")
    
        // 3. 输出结果
        source.writeStream
          .outputMode(OutputMode.Append())
          .format("console")
          .start()
          .awaitTermination()
      }
    }

    运行 Python 程序

    1. 上传 Python 源码文件到服务器中

    2. 运行 Python 脚本

      # 进入 Python 文件被上传的位置
      cd ~
      
      # 创建放置生成文件的目录
      mkdir -p /export/dataset
      
      # 运行程序
      python gen_files.py

    kafka:

    Kafka 的特点

    Kafka 有一个非常重要的应用场景就是对接业务系统和数据系统, 作为一个数据管道, 其需要流通的数据量惊人, 所以 Kafka 如果要满足这种场景的话, 就一定具有以下两个特点

    • 高吞吐量

    • 高可靠性

    Topic 和 Partitions

    • 消息和事件经常是不同类型的, 例如用户注册是一种消息, 订单创建也是一种消息

      20190717110142
    • Kafka 中使用 Topic 来组织不同类型的消息

      20190717110431
    • Kafka 中的 Topic 要承受非常大的吞吐量, 所以 Topic 应该是可以分片的, 应该是分布式的

      20190717122114

    总结

    • Kafka 的应用场景

      • 一般的系统中, 业务系统会不止一个, 数据系统也会比较复杂

      • 为了减少业务系统和数据系统之间的耦合, 要将其分开, 使用一个中间件来流转数据

      • Kafka 因为其吞吐量超高, 所以适用于这种场景

    • Kafka 如何保证高吞吐量

      • 因为消息会有很多种类, Kafka 中可以创建多个队列, 每一个队列就是一个 Topic, 可以理解为是一个主题, 存放相关的消息

      • 因为 Topic 直接存放消息, 所以 Topic 必须要能够承受非常大的通量, 所以 Topic 是分布式的, 是可以分片的, 使用分布式的并行处理能力来解决高通量的问题

    创建 Topic 并输入数据到 Topic

    1. 使用命令创建 Topic

      bin/kafka-topics.sh --create --topic streaming-test --replication-factor 1 --partitions 3 --zookeeper node01:2181
    2. 开启 Producer

      bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streaming-test
    3. 把 JSON 转为单行输入

      {"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}

    使用 Spark 读取 Kafka 的 Topic

    1. 编写 Spark 代码读取 Kafka Topic

      val source = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "node01:9092,node01:9092,node03:9092")
        .option("subscribe", "streaming_test")
        .option("startingOffsets", "earliest")
        .load()
      • 三个参数

        • kafka.bootstrap.servers : 指定 Kafka 的 Server 地址

        • subscribe : 要监听的 Topic, 可以传入多个, 传入多个 Topic 则监听多个 Topic, 也可以使用 topic-* 这样的通配符写法

        • startingOffsets : 从什么位置开始获取数据, 可选值有 earliestassignlatest

      • format 设置为 Kafka 指定使用 KafkaSource 读取数

    sink

    从 Kafka 中获取数据, 简单处理, 再次放入 Kafka

    package cn.itcast.structured
    
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    object HDFSSink {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("hadoop.home.dir", "C:\winutil")
    
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .appName("hdfs_sink")
          .master("local[6]")
          .getOrCreate()
    
        import spark.implicits._
    
        // 2. 读取 Kafka 数据
        val source: Dataset[String] = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
          .option("subscribe", "streaming_test_2")
          .option("startingOffsets", "earliest")
          .load()
          .selectExpr("CAST(value AS STRING) as value")
          .as[String]
    
        // 1::Toy Story (1995)::Animation|Children's|Comedy
    
        // 3. 处理 CSV, Dataset(String), Dataset(id, name, category)
        val result = source.map(item => {
          val arr = item.split("::")
          (arr(0).toInt, arr(1).toString, arr(2).toString)
        }).as[(Int, String, String)].toDF("id", "name", "category")
    
        // 4. 落地到 HDFS 中
        result.writeStream
          .format("parquet")
          .option("path", "dataset/streaming/moives/")
          .option("checkpointLocation", "checkpoint")
          .start()
          .awaitTermination()
      }
    }

    掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink, 同时能够将数据落地到 MySQL

    package cn.itcast.structured
    
    import java.sql.{Connection, DriverManager, Statement}
    
    import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
    
    object ForeachSink {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("hadoop.home.dir", "C:\winutil")
    
        // 1. 创建 SparkSession
        val spark = SparkSession.builder()
          .appName("hdfs_sink")
          .master("local[6]")
          .getOrCreate()
    
        import spark.implicits._
    
        // 2. 读取 Kafka 数据
        val source: Dataset[String] = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
          .option("subscribe", "streaming_test_2")
          .option("startingOffsets", "earliest")
          .load()
          .selectExpr("CAST(value AS STRING) as value")
          .as[String]
    
        // 1::Toy Story (1995)::Animation|Children's|Comedy
    
        // 3. 处理 CSV, Dataset(String), Dataset(id, name, category)
        val result = source.map(item => {
          val arr = item.split("::")
          (arr(0).toInt, arr(1).toString, arr(2).toString)
        }).as[(Int, String, String)].toDF("id", "name", "category")
    
        // 4. 落地到 MySQL
        class MySQLWriter extends ForeachWriter[Row] {
          private val driver = "com.mysql.jdbc.Driver"
          private var connection: Connection = _
          private val url = "jdbc::mysql://node01:3306/streaming-movies-result"
          private var statement: Statement = _
    
          override def open(partitionId: Long, version: Long): Boolean = {
            Class.forName(driver)
            connection = DriverManager.getConnection(url)
            statement = connection.createStatement()
            true
          }
    
          override def process(value: Row): Unit = {
            statement.executeUpdate(s"insert into movies values(${value.get(0)}, ${value.get(1)}, ${value.get(2)})")
          }
    
          override def close(errorOrNull: Throwable): Unit = {
            connection.close()
          }
        }
    
        result.writeStream
          .foreach(new MySQLWriter)
          .start()
          .awaitTermination()
      }
    }

    triggers

    package cn.itcast.structured
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming.{OutputMode, Trigger}
    
    object Triggers {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("hadoop.home.dir", "C:\winutil")
        // 创建数据源
        val spark = SparkSession.builder()
          .appName("triggers")
          .master("local[6]")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
    
        // timestamp, value
        val source = spark.readStream
          .format("rate")
          .load()
    
        // 简单处理
        //
        val result = source
    
        // 落地
        source.writeStream
          .format("console")
          .outputMode(OutputMode.Append())
          .trigger(Trigger.Once())
          .start()
          .awaitTermination()
      }
    }

     需要的外部支持

    如果要做到 exactly-once, 只是 Structured Streaming 能做到还不行, 还需要 Source 和 Sink 系统的支持

    • Source 需要支持数据重放

      当有必要的时候, Structured Streaming 需要根据 start 和 end offset 从 Source 系统中再次获取数据, 这叫做重放

    • Sink 需要支持幂等写入

      如果需要重做整个批次的时候, Sink 要支持给定的 ID 写入数据, 这叫幂等写入, 一个 ID 对应一条数据进行写入, 如果前面已经写入, 则替换或者丢弃, 不能重复

    所以 Structured Streaming 想要做到 exactly-once, 则也需要外部系统的支持, 如下

    Source

    Sources

    是否可重放

    原生内置支持

    注解

    HDFS

    可以

    已支持

    包括但不限于 TextJSONCSVParquetORC

    Kafka

    可以

    已支持

    Kafka 0.10.0+

    RateStream

    可以

    已支持

    以一定速率产生数据

    RDBMS

    可以

    待支持

    预计后续很快会支持

    Socket

    不可以

    已支持

    主要用途是在技术会议和讲座上做 Demo

    Sink

    Sinks

    是否幂等写入

    原生内置支持

    注解

    HDFS

    可以

    支持

    包括但不限于 TextJSONCSVParquetORC

    ForeachSink

    可以

    支持

    可定制度非常高的 Sink, 是否可以幂等取决于具体的实现

    RDBMS

    可以

    待支持

    预计后续很快会支持

    Kafka

    不可以

    支持

    Kafka 目前不支持幂等写入, 所以可能会有重复写入

  • 相关阅读:
    img图片下多余空白的BUG解决方案
    移动前端头部标签(HTML5 head meta)
    带你深入剖析inline-block属性值的前世今生
    SQL SERVER 2008的错误日志太大的解决办法
    主要是ftp的主动和被动模式(思路要了解ftp的工作模式,是解决问题根本的要点。)
    centos的软件安装方法rpm和yum
    iis7.5做反向代理配置方法实例图文教程
    linux vsftpd配置
    Linux文件的类型
    理解JS的闭包以及作用域链!!!
  • 原文地址:https://www.cnblogs.com/dazhi151/p/14290352.html
Copyright © 2011-2022 走看看