zoukankan      html  css  js  c++  java
  • Spark Streaming基础

    一、Spark Streaming概述:
     
    是基于Spark core的API,不需要单独安装,一盏式解决
     
    可扩展、高吞吐量、容错性、能够运行在多节点、结合了批处理、机器学习、图计算等
     
    将不同的数据源的数据经过Spark Streaming处理后输出到外部文件系统
     
     
    1. 应用场景:
     
    实时交易防欺诈检测、传感器异常实时反应
     
    整理Spark发展史问题(缺少)
     
     
    2. Spark Streaming工作原理:
     
    粗粒度:
    把实时数据流,以秒数拆分成批次的小数据块,通过Spark当成RDD来处理
     
    细粒度:
     
     
     
    3. 核心概念:
     
    编程入口:StreamingContext
     
    常用构造方法源码:
     
    def this(sparkContext: SparkContext, batchDuration: Duration) = {
      this(sparkContext, null, batchDuration)
    }
     
    def this(conf: SparkConf, batchDuration: Duration) = {
      this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
    }
     
    batchDuration 是必须填的,根据应用程序的延迟需求和资源可用情况来设置
     
    定义好streamingContext后,再定义DStream、transformation等,通过start()开始,stop()结束
     
    注意:
    一个context启动后,不能再运行新的streaming(一个JVM只能有一个streamingContext)
     
    一旦停止后,就没办法再重新开始
     
    Stop方法默认把sparkContext和streamingContext同时关掉,要不想关掉sc,必须定义stopSparkContext参数为false
     
    一个SparkContext能够创建多个StreamingContext
     
     
     最基础的抽象:Discretized Stream  (DStream)
     
    一系列的RDD代表一个DStream,是不可变的、分布式的dataset
     
     
    每一个RDD代表一个时间段(批次)的数据
     
    对DStream进行操作算子(flatMap)时,在底层上看就是对每一个RDD做相同的操作,交由Spark core运行
     
     
     数据输入:Input DStreams and Receivers
     
    每一个Input DStream 关联着一个Receiver(但从文件系统接收不需要receiver),receiver 接收数据并存在内存中
     
    receiver需要占用一个线程,所以不能定义local[1],线程的数量n必须大于receivers的数量
     
     
    转换:Transformations on DStreams
     
    与RDD类似:map、flatMap、filter、repartition、count...
     
     
     数据输出:Output Operations and DStreams:
     
    输出到数据库或者文件系统:
     
    API:print、save、foreach
     
     
     
    二、Spark Streaming实战部分:
     
    1. Spark Streaming处理socket数据:
     
    接收到的数据进行WordCount操作:
     
    在IDEA中:
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
     
    /*
    * Spark Streaming 处理Socket数据
    * */
    object NetWorkWordCount {
      def main(args: Array[String]): Unit = {
     
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
     
        //创建streamingContext的两个参数sparkConf和seconds
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        //生成Input DStream
        val lines = ssc.socketTextStream("localhost", 6789)
     
        val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
     
        result.print()
     
        ssc.start()
        ssc.awaitTermination()
     
     
      }
    }
     
    在控制台中:
     
    nc -lk 6789,创建一个Socket
     
    在这上面输入数据,就可以在IDEA中count出来了
     
     
    注意:
    在执行过程中会报错,必须在Maven projects中找出报错提示中所缺少的包,并且在dependency上加入。
    当projects中还没有的包,在http://mvnrepository.com 上搜索相应的dependency,然后让Maven帮我们自动下载。
     
     
     
     
    1. Spark Streaming处理HDFS中的数据:
     
    ssc.textFileStream("file_path")
     
    同样是像上面一样,只是改了stream的source
     
    但是测试时,必须要是生成新的文件(官网称为moving进去的文件),才会被统计;而往旧的文件里再添加数据,也不会被统计了
     
     
     
    1. Spark Streaming进阶实战:
     
    带状态的算子UpdateStateByKey、保存到MySQL、window函数
     
     
    UpdateStateByKey实现实时更新:
     
    允许把新旧状态结合,连续地更新
     
    准备工作:
    1. 定义一个状态
    2. 定义状态更新的方法
     
    注意:
     
    1. updateFunction需要隐式转换
    2. 报错:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set.
                意思就是要进行checkpoint记录
     
     
    实现代码:
     
    把reduceByKey删除,并且把map之后的RDD定义为一个state,配合这个state写状态更新方法
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
     
    /*
    * Spark Streaming有状态的统计
    * */
    object StatefulWordCount {
      def main(args: Array[String]): Unit = {
     
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        //使用状态算子必须要设置checkpoint
        //一般要保存记录在HDFS中
        ssc.checkpoint(".")
     
        val lines = ssc.socketTextStream("localhost", 6789)
        val result = lines.flatMap(_.split(" ")).map((_, 1))  //不能用reduceByKey
     
        //连续更新状态
        val state = result.updateStateByKey(updateFunction _)  //需要隐式转换
        state.print()
     
        ssc.start()
        ssc.awaitTermination()
      }
     
      /*
      * 状态更新方法更新已有的数据,放在updateStateByKey中
      * */
      def updateFunction(currData: Seq[Int], prevData: Option[Int]): Option[Int] = {
     
        val curr = currData.sum  //算出当前的总次数
        val prev = prevData.getOrElse(0)  //读取已有的
     
        //返回已有和当前的和
        Some(curr + prev)
      }
    }
     
     
    1. 统计结果写到MySQL中:
     
    前提准备:
    需要在IDEA中增加mysql的connector依赖
    在mysql数据库中先创建一张表
    写jdbc创建连接到Mysql
     
     
    使用foreachRDD,有很多种错误的写法:(没有序列化,创建太多mysql连接等)
     
    报错没有序列化:
     
    dstream.foreachRDD {rdd =>
    val connection = createNewConnection() // executed at the driver
    rdd.foreach {record =>connection.send(record) // executed at the worker
    }
    }
     
    花太多开销在连接和断开数据库上
     
    dstream.foreachRDD {rdd =>
    rdd.foreach {record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
    }
    }
     
     
    官方正确写法:
     
    使用foreachPartition进行优化连接:
     
    dstream.foreachRDD {rdd =>
    rdd.foreachPartition {partitionOfRecords =>
    val connection = createNewConnection()  //创建mysql连接
    partitionOfRecords.foreach(record =>
    connection.send(record))
    connection.close()
    }
    }
     
    用连接池进行进一步优化:
     
    Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
     
    dstream.foreachRDD {rdd =>
    rdd.foreachPartition {partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record =>connection.send(record))
    ConnectionPool.returnConnection(connection) // return to the pool for future reuse
    }
    }
     
    在写入MySQL数据时,应该作一个是否存在的判断:
    若存在则使用update语句,不存在则使用insert语句
     
     
     
    1. Window的使用:
     
     
    两个参数:
    window length:窗口长度
    sliding interval:窗口间隔
     
    也就是每隔sliding interval统计前window length的值
     
    API:countByWindow、reduceByWindow…
     
     
     
    1. 实战:黑名单过滤
     
    transform算子的使用+Spark Streaming整合RDD操作
     
    元组默认从1开始数
     
    假设输入数据为id, name 这种形式
     
    实现过程:
    1. 建立黑名单元组 => (name, true)
    2. 把输入数据流编程元组 => (name, (id, name))
    3. transform,把每个DStream变成一个个RDD操作
    4. 数据流的RDD与黑名单RDD进行leftjoin,获得新的元组
    5. filter判断过滤
    6. 整合输出
     
    实现代码:
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    /*
    * 黑名单过滤demo
    * */
    object TransformApp {
      def main(args: Array[String]): Unit = {
     
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        //构建黑名单列表, 实际应用中可在外面读取列表, 并转成RDD, 用true标记为是黑名单元组(name, true)
        val blacks = List("zs", "ls")
        val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
     
        //获取每行
        val lines = ssc.socketTextStream("localhost", 6789)
        //把id, name => 元组(name, (id, name))
        //transform 的使用,对stream的每个RDD操作
        val filterResult = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
          //与黑名单进行leftjoin => (name, ((id, name), true)), 并过滤出是true的项
          rdd.leftOuterJoin(blacksRDD)
            .filter(x => x._2._2.getOrElse(false) != true)   //过滤出不等于true的
            .map(x => x._2._1)
        })
        filterResult.print()
     
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
     
    1. Spark Streaming整合Spark SQL
     
    整合完成词频统计操作
     
    官网代码:
     
     
    就是foreachRDD把streaming转成RDD,然后toDF就可以进行DataFrame或者是sql的操作了
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    第五周
    第四周
    第三周作业
    第二周编程总结
    编程总结(3)
    编程总结(2)
    编程总结(1)
    第七周作业
    第六周作业
    第五周作业
  • 原文地址:https://www.cnblogs.com/kinghey-java-ljx/p/8544314.html
Copyright © 2011-2022 走看看