zoukankan      html  css  js  c++  java
  • Spark Streaming初探

     1.  介绍

    Spark Streaming是Spark生态系统中一个重要的框架,建立在Spark Core之上,与Spark SQL、GraphX、MLib相并列。

    Spark Streaming是Spark Core的扩展应用,具有可扩展性、高吞吐量、可容错性等特点。

    可以监控来自Kafka、Flume、HDFS、Twitter、Socket套接字等数据,通过复杂算法及一系列的计算分析数据,且可将分析结果存入HDFS、数据库或前端页面。

     

    2. 工作原理

    Spark的核心是RDD(或DataFrame)、对于Spark Streaming来说,它的核心是DStream。DStream是一系列RDD的集合,DStream可以按照秒数将数据流进行批量划分。

    首先从接收到流数据之后,将其划分为多个批次,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS、数据库或前端页面等。

    详细可以参考下图:

    图1

    图2

    当启动Spark Streaming应用的时候,首先会在一个节点的Executor上启动一个Receiver接收者,然后当从数据源写入数据的时候会被Recevier接收,接收到数据之后Receiver会将数据Split成多个Block,然后被分到各个节点(Replicate Blocks容灾恢复),然后Receiver向Streaming Context进行块报告,说明数据在哪几个节点的Executor上,接着在一定间隔时间内StreamingContext会将数据处理为RDD,并交给SparkContext划分到各个节点进行并行计算。

    StreamingContext中内部定义了SparkContext,可以通过StreamingContext.sparkContetxt进行访问。

    3. Spark Streaming Demo

    官方给出的例子,是从Socket源端收集数据运行wordcount的案例。具体代码如下:

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object QuickStart {
      def main(args: Array[String]): Unit = {
        // 创建StreamingContext,包含2个线程,且批处理间隔为1秒
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
        val ssc = new StreamingContext(conf, Seconds(1))
    
        // 创建DStream,数据源为TCP源
        val lines = ssc.socketTextStream("localhost", 9999)
    
        // 将每行文本且分为单词
        val words = lines.flatMap(_.split(" "))
    
        // 计算单词个数
        val pairs = words.map(word => (word, 1))
        val wordCount = pairs.reduceByKey(_ + _)
        wordCount.print()
    
        // 执行计算
        ssc.start()
        ssc.awaitTermination()
      }
    } 

    从Spark Streaming初始化的源码中可看到,其初始化有两种方式:

    (1) 通过SparkConf来创建:如上例所示

    (2) 通过SparkContext创建,可在Spark-Shell命令行中运行

    例:在Spark-Shell中运行Spark Streaming,监控HDFS中的某个文件夹的数据传入,并按指定时间间隔统计词频

    a.  编写SparkStreamingDemo.scala,并放置在Spark的某驱动节点上。

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    val ssc = new StreamingContext(sc, Seconds(10))
    
    // read data
    val lines = ssc.textFileStream("/music_logs/tmp/albumRecSta")
    
    // process
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    wordCounts.print()
    
    ssc.start()  // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate 

    b. 运行Spark-shell,以本地模式运行:spark-shell --master local[2]

    c. 在Spark-Shell中加载代码,执行如下命令:

    :load /home/hadoop/test/SparkStreamingDemo.scala 

    d. 然后上传任意文件到给定的HDFS目录下,通过观察Spark-Shell中的执行结果,可以实时查看Spark Streaming的处理。

    4. 流程

    通过上述代码,可以看出Spark Streaming的编程步骤:

    (1) 初始化StreamingContext

    (2) 定义输入源

    (3) 准备流计算指令

    (4) 利用streamingContext.start()启动接口和处理数据

    (5) 处理过程一直持续,知道streamingContext.stop()被调用。

    注意:

    a. 一旦一个context已经启动,不能有新的流算子建立或加入到上下文中;一旦一个context已经停止,就不能再重新启动

    b. 在JVM中,同一时间只能有一个StreamingContext处于活跃状态

    c. streamingContext.stop()执行后,其sparkContext对象也会关闭。当stop设置参数为false时,只会关闭streamingContext。

    d. 一个SparkContext可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面的StreamingContext创建之前关闭,且不关闭SparkContext。

    5. 模块

    (1) DStream

    Spark Streaming提供的基本抽象,表示一个连续的工作流。可来自数据源获取、或者输入流通过转换算子生成处理。DStream由一系列连续的RDD组成,每个RDD都包含确定时间间隔内的数据。

    上述代码中, flatMap操作应用于lines这个DStream的每个RDD,生成words这个DStream的过程如下:

    (2) 输入DStream和Receiver

    每个输入流DStream和一个Receiver对象关联,Receiver从源中获取数据,并将数据存入内存中用于处理。

    Spark Streaming包含两类数据源:

    a. 基本源:可在StreamingContext的API中直接引入,例如:文件系统(textFileStream)、套接字链接(socketTextStream)、Akka的actor等

    b. 高级源:包括Kafka、Flume、Kinesis、Twitter等,需要额外的类来使用。如spark-streaming-kafka_2.10、spark-streaming-flume_2.10、spark-streaming-kinesis-asl_2.10、spark-streaming-twitter_2.10、spark-streaming-zeromq_2.10、spark-streaming-mqtt_2.10等

    流应用中科创建多个输入DStream来处理多个数据流。将创建多个Receiver同时接收多个数据流。但是Receiver作为长期运行的任务运行在Spark的woker或executor中。因此占用一个核,所以需要考虑为Spark Streaming应用程序分配足够的核(如果本地运行,则为线程)。

    注意:

    a. 如果分配给应用程序的核数少于或等于输入DStreams或Receivers的数量,系统只能够接收数据而不能处理他们

    b. 运行在本地时,只有一个核运行任务。

     1) 基本源

    a. 文件流:从任何与HDFS API兼容的文件系统中读取数据,创建方式:smc.fileStream[keyClass, valueClass, imputFormatClass](dataDirectory)

    Spark Streaming将监控dataDirectory目录,并处理目录下生成的文件(嵌套目录不支持)。要求目录下的文件必须具有相同的数据格式;所有文件必须在dataDirectory目录下创建,文件时自动移动和重命名到目录下;一旦移动,文件必须修改。若文件被持续追加数据,新的数据不会被读取。

    文件流不需要运行一个receiver,所以不需要分配核。

    b. 自定义actor流:调用smc.actorStream(actorProps, actorName)方法从Akka actors获取数据流

    c. RDD队列作为数据流:可调用smc.queueStream(queueOfRDDs)方法基于RDD队列创建DStreams。

    代码示例:

    a. 自定义Receiver

    import java.io.PrintWriter
    import java.net.ServerSocket
    
    import scala.io.Source
    
    /**
      * 创建外部socket端,数据流模式器
      */
    object StreamingSimulation {
    
      def index(n: Int) = scala.util.Random.nextInt(n)
    
      def main(args: Array[String]): Unit = {
        // 调用该模拟器需要三个参数,文件路径、端口号、时间间隔
        if(3 != args.length){
          System.err.println("Usage: <fileName> <port> <millisecond>")
          System.exit(1)
        }
    
        // 获取指定文件总行数
        val fileName = args(0)
        val lines = Source.fromFile(fileName).getLines.toList
        val fileRow = lines.size
    
        // 指定监听某端口,但外部程序请求时建立连接
        val listener = new ServerSocket(args(1).toInt)
    
        while (true){
          val socket = listener.accept()
          new Thread(){
            override def run(): Unit = {
              println("Got client connected from: " + socket.getInetAddress)
              val out = new PrintWriter(socket.getOutputStream, true)
              while(true){
                Thread.sleep(args(2).toLong)
                // 当该端口接收请求时,随机获取某行数据发送给对方
                val content = lines(index(fileRow))
                println("-------------------------------------------")
                println(s"Time: ${System.currentTimeMillis()}")
                println("-------------------------------------------")
                println(content)
                out.write(content + "
    ")
                out.flush()
              }
              socket.close()
            }
          }
        }
      }
    }
    View Code
    import java.io.{BufferedReader, InputStreamReader}
    import java.net.Socket
    import java.nio.charset.StandardCharsets
    
    import org.apache.spark.{Logging, SparkConf}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 自定义Receiver
      */
    object CustomReceiver {
      def main(args: Array[String]): Unit = {
        if(2 > args.length){
          System.err.println("Usage: CustomReceiver <hostName> <port>")
          System.exit(1)
        }
    
        // Create the context with a 1 second batch size
        val conf = new SparkConf().setAppName("CustomReceiver").setMaster("local[4]")
        val smc = new StreamingContext(conf, Seconds(10))
    
        // Create an input stream with the custom receiver on target ip:port and count the
        // words in input stream of 
     delimited text (eg. generated by 'nc')
        val lines = smc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        smc.start()
        smc.awaitTermination()
      }
    }
    
    class CustomReceiver(host: String, port: Int)
      extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) with Logging {
      override def onStart(): Unit = {
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver"){
          override def run(): Unit = {
    
          }
        }
      }
    
      override def onStop(): Unit = {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false
      }
    
    
      /** Create a socket connection and receive data until receiver is stopped */
      private def receive() {
        var socket: Socket = null
        var userInput: String = null
        try {
          logInfo("Connecting to " + host + ":" + port)
          socket = new Socket(host, port)
          logInfo("Connected to " + host + ":" + port)
          val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
          userInput = reader.readLine()
          while(!isStopped && userInput != null) {
            store(userInput)
            userInput = reader.readLine()
          }
          reader.close()
          socket.close()
          logInfo("Stopped receiving")
          restart("Trying to connect again")
        } catch {
          case e: java.net.ConnectException =>
            restart("Error connecting to " + host + ":" + port, e)
          case t: Throwable =>
            restart("Error receiving data", t)
        }
      }
    }
    View Code

       b.  文件流

    import java.util.logging.{Level, Logger}
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * HDFS文件流
      */
    object FileStreaming {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
        Logger.getLogger("org.eclipse.jetty.Server").setLevel(Level.OFF)
    
        val conf = new SparkConf().setAppName("fileStreamData").setMaster("local[2]")
        val sc =new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(2))
    
        //fileStream 用法
        //val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs:///examples/").map{ case (x, y) => (x.toString, y.toString) }
        //lines.print()
        val lines = ssc.textFileStream("/root/application/dataDir/")
        val wordCount = lines.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_+_)
        wordCount.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    View Code

       c. 网络数据源

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 网络数据源:rawSocketStream
      * Receives text from multiple rawNetworkStreams and counts how many '
    ' delimited
      * lines have the word 'the' in them. This is useful for benchmarking purposes. This
      * will only work with spark.streaming.util.RawTextSender running on all worker nodes
      * and with Spark using Kryo serialization (set Java property "spark.serializer" to
      * "org.apache.spark.serializer.KryoSerializer").
      * Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis>
      *   <numStream> is the number rawNetworkStreams, which should be same as number
      *               of work nodes in the cluster
      *   <host> is "localhost".
      *   <port> is the port on which RawTextSender is running in the worker nodes.
      *   <batchMillise> is the Spark Streaming batch duration in milliseconds.
      */
    
    object RawNetWorkGrep {
      def main(args: Array[String]): Unit = {
        if (args.length != 4) {
          System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis>")
          System.exit(1)
        }
    
        val sparkConf = new SparkConf().setAppName("RawNetworkGrep")
        val ssc = new StreamingContext(sparkConf, Seconds(args(3).toLong))
    
        val rawStreams = (1 to 100).map(_ =>
          ssc.rawSocketStream[String](args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER_2)).toArray
        val union = ssc.union(rawStreams)
    
        union.filter(_.contains("the")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString))
        ssc.start()
        ssc.awaitTermination()
      }
    }
    View Code

       d.  TCP协议数据源

    /**
      * TCP协议的数据源
      */
    object TcpOnStreaming {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TCPOnStreaming example").setMaster("local[4]")
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc,Seconds(2))
    
        // set the checkpoint directory
        ssc.checkpoint("/Res")
    
        // get the socket streaming data
        val socketStreaming = ssc.socketTextStream("master", 9999)
        val data= socketStreaming.map(x => (x, 1))
        data.print()
    
        val socketData = ssc.socketStream[String]("master", 9999, myDeserialize, StorageLevel.MEMORY_AND_DISK_SER)
        socketData.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      def myDeserialize(data: InputStream): Iterator[String] = {
        data.read().toString.map(x => x.hashCode().toString).iterator
      }
    }
    View Code

       e.  RDD队列

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    /**
      * RDD队列
      */
    class QueueStream {
      val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")
      // Create the context
      val ssc = new StreamingContext(sparkConf, Seconds(1))
    
      // Create the queue through which RDDs can be pushed to a QueueInputDStream
      val rddQueue = new mutable.Queue[RDD[Int]]()
    
      // Create the QueueInputDStream and use it do some processing
      val inputStream = ssc.queueStream(rddQueue)
      val mappedStream = inputStream.map(x => (x % 10, 1))
      val reducedStream = mappedStream.reduceByKey(_ + _)
      reducedStream.print()
      ssc.start()
    
      // Create and push some RDDs into rddQueue
      for (i <- 1 to 30) {
        rddQueue.synchronized {
          rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
        }
        Thread.sleep(1000)
      }
      ssc.stop()
    }
    View Code

     2) 高级源

    a. kafka:【可参考https://blog.csdn.net/weixin_41615494/article/details/79521737, https://www.cnblogs.com/xlturing/p/6246538.html

    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.immutable
    
    /**
      *  Kafka与Spark Streaming结合示例
      *
      * @author songwang4
      */
    object KafkaStreaming {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("SparkStreamingKafka_Receiver")
          .setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        val ssc = new StreamingContext(sc, Seconds(5))
    
        createDStream(sc, ssc)
        createDirectDStream(sc, ssc)
    
    
        // 开启计算
        ssc.start()
        ssc.awaitTermination()
      }
    
      /**
        * 直接采用KafkaUtils.createStream创建
        * @param sc
        */
      def createDStream(sc: SparkContext, ssc: StreamingContext): Unit ={
        //开启wal预写日志,保存数据源的可靠性
        sc.getConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
        // 设置checkpoint
        ssc.checkpoint("./Kafka_Receiver")
        // 定义ZK地址
        val zkQuorum = "node-1:2181,node-2:2181,node-3:2181"
    
        // 定义消费组
        val groupId = "spark_receiver"
        // 定义topic相关信息(这里的value并不是topic分区数,它表示的topic中每一个分区被N个线程消费)
        val topics = Map("kafka_spark" -> 2)
        // 对接Kafka(这个时候相当于同时开启3个receiver接受数据)
        val receiverDsStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
          val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
          stream
        })
        val unionDStream: DStream[(String, String)] = ssc.union(receiverDsStream)
        // 获取topic中的数据
        val result: DStream[(String, Int)] = unionDStream.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
        result.print()
      }
    
      /**
        * 采用KafkaUtils.createStream创建
        * @param sc
        */
      def createDirectDStream(sc: SparkContext, ssc: StreamingContext): Unit ={
        // 配置Kafka相关参数
        val kafkaParams = Map("metadata.broker.list" -> "node-1:9092,node-2:9092,node-3:9092", "group.id"->"Kafka_Direct")
        // 定义topic
        val topics = Set("kafka_spark")
        // 采用是kafka低级api偏移量不受zk管理
        val dStream: InputDStream[(String, String)] =
          KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, topics)
        val result: DStream[(String, Int)] = dStream.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        result.print()
      }
    }
    View Code

    b. flume:【可参考https://blog.csdn.net/qq_20641565/article/details/76685697, https://blog.csdn.net/weixin_41615494/article/details/79521120

     6. DStream转换

      DStream与RDD类似,允许将输入的DStream进行修改转换,常用的算子包括map(func), flatMap(func), filter(func), repartition(numPartitions), union(otherStream), count(), reduce(func), countByValue(), reduceByKey(func, [numTasks]), join(otherStream, [numTasks]), cogroup(otherStream, [numTasks]) , transform(func), updateStateByKey(func)。

    其中:

    (1) cogroup

    当应用于两个DStream,一个包含(K, V),一个包含(K, W),返回一个包含(K, Seq[V], Seq[W])的元组

    (2) updateStateByKey

    UpdateStateByKey在Spark Streaming中可以每一个key通过checkpoint维护一份state状态,通过更新函数对该key的状态不断更新;对每一个新批次的数据而言,Spark Streaming通过使用upadteStateByKey为已经存在的key进行state状态更新(对每个新出现的key,会同样执行state的更新函数操作);但如果通过更新函数对state更新返回为none的话,此时key对应的state状态将被删除。

    UpdateStateByKey中的state可以是任意类型的数据结构。

    如果要不断的更新每个key的state,就一定会涉及到状态的保存和容错,这个时候就需要开启checkpoint机制和功能,需要说明的是checkpoint的数据可以保存一些存储在文件系统上的内容

    关于流式处理对历史状态进行保存和更新具有重大实用意义,例如进行广告(投放广告和运营广告效果评估的价值意义,热点随时追踪、热力图)

    例:向保持一个文本数据流中每个单词的运行次数

    (3) transform

    transform允许在DStream运行任何RDD-To-RDD函数,主要用于DStream API中未提供的RDD操作。例如join数据流中每个批次好另一个数据集的功能,未在DStream中提供,可简单使用transform实现。

    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      *  基于transform实现黑名单过滤
      */
    object StreamingTransformTest {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("BlackListFilter").setMaster("local[*]"))
        val ssc = new StreamingContext(sc, Seconds(5))
    
        // 设置黑名单
        val blackListRdd = ssc.sparkContext.parallelize(Array(("jack", true), ("ws", false)), 3)
    
        // 使用socketStreaming监听端口
        val socketStream = ssc.socketTextStream("127.0.0.1", 8080)
    
        val userStream = socketStream.map(line => (line.split(" ")(1), line))
    
        // 基于leftOuterJoin进行过滤
        val validStream = userStream.transform(rdd => {
          val jRdd = rdd.leftOuterJoin(blackListRdd)
          jRdd.filter(_._2._2.getOrElse(false))
        }).map(_._2._1)
    
    
        validStream.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    View Code

     7. Window操作

    窗口操作允许在一个滑动窗口数据上应用transformation算子。滑动窗口即窗口在源DStream上滑动,合并和操作落入窗内的源RDDs,产生窗口化的DStream的RDDs。下图是在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。

    window操作参数包括:(1) 窗口长度,即窗口持续时间; (2) 滑动的时间间隔,即窗口执行的时间间隔,注意,两个参数必须是源DStream的批时间间隔的倍数。

    例:热点搜索词滑动统计,每个10秒,统计最近60秒的搜索词的搜索频次,并打印出排名靠前的3个搜索词及出现次数【参考:https://www.cnblogs.com/duanxz/p/4408789.html

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamingWindowTest {
      val sc = new SparkContext(new SparkConf().setAppName("WindowHotWordS").setMaster("local[*]"))
      val ssc = new StreamingContext(sc, Seconds(5))
    
      val searchLogsDStream = ssc.socketTextStream("spark1", 9000)
      val searchWordPairDStream = searchLogsDStream.map(f => (f.split(" ")(1), 1))
    
      /**
        * 第二个参数是窗口长度;第三个参数是滑动间隔,即每隔10秒,将最近60秒的数据,作为一个窗口,进行内部的RDD聚合,然后统一对
        * 一个RDD进行后续计算,然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,
        * 所以之前60秒,就有12个RDD,给聚合起来,然后统一执行reduceByKey操作
        *  所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
        * 每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
        */
      val searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow((a: Int,b: Int) => a + b, Seconds(60), Seconds(10))
    
      val finalDStream = searchWordCountsDStream.transform(searchWordCountsRDD  => {
        val countSearchWordRDD = searchWordCountsRDD .map(f => (f._2, f._1))
        val sortedCountSearchWordsRDD = countSearchWordRDD.sortByKey(false)
        val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
        val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)
        for (tuple <- top3SearchWordCounts) {
          println("result : " + tuple)
        }
        searchWordCountsRDD
      })
    
      finalDStream.print()
    
      ssc.start()
      ssc.awaitTermination()
    }
    View Code

    常用的窗口操作如下:

    a. window(windowLength, slideInterval):基于源Dstream产生的窗口化的批数据计算一个新的DStream

    b. countByWindow(windowLength, sildeInterval):对每个滑动窗口的数据执行count操作

    c. reduceByWindow(func, windowLength, sildeInterval):对每个滑动窗口的数据执行reduce操作

    d. reduceByKeyAndWindow(func, windowLength, slideInterval, [num Tasks]): 对每个滑动窗口的数据执行reduceByKey操作

    e. countByValueAndWindow(windowLength, slideInterval, [num Tasks]):应用到一个(k, v)对组成的DStream中,返回一个由(k, v)对组成的新的DStream,每个key的值都是他们在滑动窗口中出现的频率

    8. DStreams的输出操作

    DStream的输出操作有如下几种:

    a . print(): DStream的每个批数据中打印前10条元素。在开发和测试中常用。

    b. saveAsObjectFiles(prefix, [suffix]):保存DStream的内容为一个序列化的文件,每一个批间隔文件的文件名基于prefix和suffox生成(prefix-TIME_IN_MS[.suffix]).

    c. saveAsTextFiles(prefix, [suffix]):保存DStream的内容为一个文本文件。

    d. saveAsHadoopFiles(prefix, [suffix]): 保存DStream的内容为一个hadoop文件。

    e. foreachRDD(func):在从流生成的每个RDD上应用函数func的最通用的输出操作。函数可以将每个RDD中的数据推送到外部系统,如写到文件或数据库中

    foreachRDD中常见的一般错误

    a. 写数据到外部系统需要创建一个连接对象,可能不经意间在Spark的驱动创建一个连接对象,但在Spark worker中尝试调用这个连接对象保存记录到RDD中。由于需要先序列化连接对象,然后将它从driver发送到worker中,这样的连接对象在机器间不能传送,可能发生序列化错误或初始化错误。

    dstream.foreachRDD(rdd => {
        // 将在驱动服务器上执行
        val connection = createNewConnection()
        rdd.foreach(record => {
            // 在worker节点上执行
            connection.send(record)
        })
    })
    View Code

     b.正确的做法是在worker中创建连接对象。 还有一种常见的错误,即为每一条记录创建一个连接对象。创建一个连接对象有资源和时间的开支,将明显减少系统的整体吞吐量。

    dstream.foreachRDD(rdd =>{
        rdd.foreach(record => {
            val connection = createNewConnection()
            connection.send(record)
            connection.close()
        })
    })
    View Code

    c. 更好的办法是利用rdd.foreachPartition方法,为RDD的partition创建一个连接对象,对该连接对象发送partition的所有记录。

    dstream.foreachRDD(rdd =>{
        rdd.foreachPartition(partitionRecords => {
            val connection = createNewConnection()
            partitionRecords .foreach(record => connection.send(record))
            connection.close()
        })
    })
    View Code

    d. 最后通过 在多个RDD或批数据间重用连接对象,可以做更进一步的优化。可以保存一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统。

    dstream.foreachRDD(rdd => {
        rdd.foreachPartition(partitionRecords => {
            // ConnectionPool是静态懒加载的连接池
            val connection = ConnectionPool.getConnection()
            partitiionRecords.foreach(record => connection.send(record))
            // 返回连接池
            ConnectionPool.returnConnection(connection)
         })
    })
    View Code

    注意:如果应用程序没有任何输出或者,存在输出操作dstream.foreachRDD(),但是没有任何的RDD的action操作存在dstream.foreachRDD中,name系统仅仅会接受输入,然后丢弃,因为DStreams的输出操作是懒执行的方式。

    9. 缓存或持久化

    DStream允许开发者持久化流数据到内存中。在DStream上使用persist方法。对于reduceByWindow、reduceByKeyAndWindow、updateStateKey操作,持久化是默认的,不需要调用persist方法。

    10. Checkpointing

    Spark Streaming应用程序如果不手动停止,则将一直运行下去,在实际中应用程序一般是24小时*7天不间断运行的,因此Streaming必须对诸如系统错误,JVM出错等与程序逻辑无关的错误(failures)具体很强的弹性,具备一定的非应用程序出错的容错性。Spark Streaming的Checkpoint机制便是为此设计的,它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复。

    存在两种checkpoint:

    1)  Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中,用于恢复应用程序中运行worker节点的故障。元信息包括:

    a. Configuration: 创建Spark Streaming应用程序的配置信息

    b. DStream operations: 定义Streaming应用程序的操作集合

    c. Incomplete batches: 操作存在队列中未完成的批

    2) Data checkpointing: 保存生成的RDD到可靠的存储系统中。

    在有状态转换中是必须的。在这样的转换中,生成的RDD依赖于之前的批的RDD。随着时间推移,依赖链的长度会持续增长,在恢复的过程中,为了避免无限增长,有状态的转换的中间RDD将会定时地存储到可靠存储系统中。

    应用程序在两种状态下必须开启checkpoint。

    a. 使用有状态的转换。如用updateStateByKey、reduceByKeyAndWindow,checkpoint目录必须提供用以定期checkpoint RDD.

    b. 从运行应用程序的driver的故障中恢复过来。使用元数据checkpoint恢复处理信息

    在存储系统中设置一个目录用于保存checkpoint信息,可以通过streammingContext.checkpoint(checkpointDirectory)方法,该方法用于有状态的转换。

    此外,如果想从driver故障中恢复,可以按照如下方式:

    a. 当应用程序第一次启动,新建一个StreamingContext,启动所有Stream,然后调用start方法

    b. 当应用程序因故障重启后,它将会从checkpoint目录重新创建StreamingContext。

    // 创建病启动一个新的streamingcontext
    def functionCreateContext(): StreamingContext = {
        val ssc = new StreamingContext(...)
        val lines = ssc.socketStreaming(...)
        ...
        ssc.checkpoint(checkpointDirectory) // 设置checkpoint目录
        ssc
    }
    
    
    // 从checkpoint数据获取StreamingContext或创建一个新的
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context, ...
    
    context.start()
    context.awaitTermination()
    View Code

    若checkpointDirectory存在,上下文将会利用checkpoint数据重新创建,如果不存在,将会调用fuctionCreateContext函数创建一个新的上下文,建立DStream。

    RDD的checkpoint有存储成本,需要认真设置批处理的时间间隔,典型地,设置checkpoint的间隔是DStream的滑动间隔的5-10倍大小。

    Ps:

    为了更好的容错保证,spark 1.2后引入了新的特性-预写日志(write ahead log)。使用该特性,从receiver获取的所有数据将预写日志到checkpoint目录。可以防止driver故障丢失数据,从而保证零数据丢失。

    该功能可以通过设置参数spark.streaming.receiver.writeAheadLogs.enable为true时开启。

    11. 性能优化

    (1) 减少批数据的执行时间

    1)数据接收的并行水平

    创建多个输入DStream并配置他们可以从源中接收不同分区的数据流,从而实现多数据流接收。多个DStream被合并生成单个DStream,运用在单个输入DStream的转换操作可以运用在合并的DStream中。

    val numSteams = 5
    val kafakStreams = (1 to numStreams).map(i => KafkaUtils.createStream(...))
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()
    View Code

    另一个需考虑的参数是receiver的阻塞时间,可由配置参数spark.streaming.blockInterval决定,默认值200毫秒

    2) 数据处理的并行水平

    默认的并发任务数通过配置属性spark.default.parallelism确定。

    3) 数据序列化

    (2) 任务启动开支

    1) 任务序列化,运行kyro,序列化任何可以减小任务的大小,从而减少任务发送到slave的时间

    2) 执行模式:在Standalone模式或粗粒度Mesos模式下运行Spark可比细粒度的Mesos模式下运行Spark获得更短的任务启动时间

    (3) 设置正确的批容量

    找出正确的批容量的好方法,用一个保守的批间隔时间(5-10秒)和低数据速率来测试应用程序。

    (4) 内存调优

    减少Spark streaming应用程序垃圾回收的相关暂停,以获得更稳定的批处理时间:

    a. DStream的默认持久化级别是存储到内存中

    b. 默认情况下,通过Spark的LUR内置策略,SPakr Streaming生成的持久化RDD会将从内存中清理掉。然而,可以设置配置选项spark.streaming.unpersist为true来更智能地去持久化(unpersist)RDD。

    c. 使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。

     

     

     

     

  • 相关阅读:
    vue生命周期
    vue input 循环渲染问题
    Node express post 大小设置
    webpack 好文章
    知识点的总结
    jsplumb 使用总结
    理解es6 中 arrow function的this
    分块编码(Transfer-Encoding: chunked)
    CGI的工作原理
    JS数组循环的性能和效率分析(for、while、forEach、map、for of)
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/10461354.html
Copyright © 2011-2022 走看看