zoukankan      html  css  js  c++  java
  • Spark Streaming

    Spark Streaming

    1、介绍

    Spark Streaming是Spark core API的扩展,针对实时数据流计算,具有可伸缩性、高吞吐量、自动容错机制的特点。数据源可以来自于多种方式,例如kafka、flume等等。使用类似于RDD的高级算子进行复杂计算,像map、reduce、join和window等等。最后,处理的数据推送到数据库、文件系统或者仪表盘等。也可以对流计算应用机器学习和图计算。

    spark_025

    在内部,spark streaming接收实时数据流,然后切割成一个个批次,然后通过spark引擎生成result的数据流。

    spark_026

    Spark Streaming提供了称为离散流(DStream-discretized stream)的高级抽象,代表了连续的数据流。离散流通过kafka、flume等源创建,也可以通过高级操作像map、filter等变换得到,类似于RDD的行为。内部,离散流表现为连续的RDD。

    2、体验Spark Streaming编程

    1. 创建模块,添加spark-streaming的maven依赖

      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
      </dependency>
      

    2. 编写word count的scala程序

      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      /**
        * Spark Streaming程序
        */
      object WordCountStreamingScala {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf()
          conf.setAppName("streaming")
          conf.setMaster("local[*]")
      
          //创建SparkStreamingContext
          val sc = new StreamingContext(conf , Seconds(1)) ;
      
          //行流,对接套接字文本流
          val lines = sc.socketTextStream("s101" , 8888)
      
          //单词流
          val words = lines.flatMap(_.split(" "))
      
          //对流
          val pairs = words.map((_, 1))
      	
          //计算结果
          val result = pairs.reduceByKey(_+_)
      
          //打印结果
          result.print()
      
          //启动上下文
          sc.start()
          
          //等待停止
          sc.awaitTermination()
        }
      }
      
      
    3. 导入log4j属性配置文件,修改日志级别,否则输出过多信息,不利用观察

      #
      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #
      
      # Set everything to be logged to the console
      # 修改这里的INFO为ERROR
      # log4j.rootCategory=INFO, console
      log4j.rootCategory=ERROR, console
      log4j.appender.console=org.apache.log4j.ConsoleAppender
      log4j.appender.console.target=System.err
      log4j.appender.console.layout=org.apache.log4j.PatternLayout
      log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
      
      # Set the default spark-shell log level to WARN. When running the spark-shell, the
      # log level for this class is used to overwrite the root logger's log level, so that
      # the user can have different defaults for the shell and regular Spark apps.
      log4j.logger.org.apache.spark.repl.Main=WARN
      
      # Settings to quiet third party logs that are too verbose
      log4j.logger.org.spark_project.jetty=WARN
      log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
      log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
      log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
      log4j.logger.org.apache.parquet=ERROR
      log4j.logger.parquet=ERROR
      
      # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
      log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
      log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
      
      

    4. 启动nc服务器

      $>nc -lk 8888
      

    5. 启动streaming程序

      spark_027

    6. 在nc服务器程序命令输入单词

      spark_028

    7. 在Streaming控制台查看结果输出

      spark_029

    8. 编写java版流应用程序

      package com.oldboy.bigdata.spark.java;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFunction;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.Seconds;
      import org.apache.spark.streaming.api.java.JavaDStream;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import scala.Tuple2;
      
      import java.util.Arrays;
      import java.util.Iterator;
      
      /**
       * spark streaming java版
       */
      public class WordCountStreamingJava {
        public static void main(String[] args) throws Exception {
          SparkConf conf = new SparkConf() ;
          conf.setAppName("streaming java") ;
          conf.setMaster("local[*]") ;
      
          //创建上下文
          JavaStreamingContext sc = new JavaStreamingContext(conf , 
                                                             Durations.seconds(1)) ;
          //行
          JavaDStream<String> lines = sc.socketTextStream("s101", 8888);
          //单词
          JavaDStream<String> words = lines.flatMap(
            new FlatMapFunction<String, String>() {
              public Iterator<String> call(String s) throws Exception {
                String[] arr = s.split(" ") ;
                return Arrays.asList(arr).iterator();
              }
          }) ;
          //对流
          JavaPairDStream<String, Integer> pairs = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
              }
          }) ;
      
          //结果
          JavaPairDStream<String, Integer> res= pairs.reduceByKey(
            new Function2<Integer, Integer, Integer>() {
              public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2 ;
              }
          }) ;
      
          //输出结果
          res.print();
      
          //启动
          sc.start();
          sc.awaitTermination();
        }
      }
      
      

    3、基本概念

    3.1 StreamingContext初始化

    appName是应用程序名称,master是Spark,、Mesos或YARN,也可以是local,local是本地模式运行。实际应用不需要指定master值,通过spark-submit提交命令中获取该参数,定义完上下文后,必须要完成如下工作:

    • 通过创建离散流定义数据源
    • 为流定义变换等计算工作
    • streamingContext.start()开始接受数据并进行处理
    • 使用streamingContext.awaitTermination()函数停止应用
    • 手动调用streamingContext.stop()方法停止应用

    切记:

    • 上下文启动后,不能设置新的计算方法
    • 上下文停止后,不能重启
    • 流上下文停止后,还会停止SparkContext,如果不希望停止SparkContext,可以通过stop(false)。
    • SparkContext可以重用来创建多个流上下文,新的流上下文创建前需要停止上一个流上下文。

    3.2 离散流(DStream)

    离散流是Spark流应用的抽象,表示的是连续的数据流,数据流要么从数据源而来,或者通过变换生成。在内部,离散流表示连续的RDD。

    spark_030

    对离散流的任何应用,都会转换为操纵底层的RDD:

    spark_031

    底层的RDD变换工作,Spark引擎进行计算。

    3.3 Input DStream和Receiver

    InputStream Dstream也是一种DStream,从数据源接受的数据流,每个Input DStream都和一个Receiver关联,Receiver是接受数据并存储在Spark内存中。

    Spark内部提供了两种类型的源:

    • 基本源

      Spark API直接能够使用,比如FileSystem或Socket连接。

    • 高级源

      像kafka、flume等源需要借助于第三方工具类进行连接。

    Spark可以在一个流上下文中创建多个InputStream,就可以进行并行计算,这些创建的多个Input DStream具有相同时间切片,不可能给不同的Input DStream分别设置时间切片,因为时间切片设置在StreamContext中完成,同时也会创建多个Receiver。接受器单独占用一个CPU内核,即在一个单独的线程中死循环方式读取数据,需要分配足够的cpu内核来处理数据。保证CPU内核数据大于Receiver个数。

    注意事项:

    • 本地模式下,不能使用local或者local[1],Receiver占据唯一的线程,没有线程执行计算工作。
    • 扩展到集群,内核数大于Receiver个数。

    4、Receiver

    4.1 内部结构

    Receiver内部维护了队列,放置的是Block对象,Block包含blockId的ArrayBuffer两个属性。每个Block对应一个分区,默认每200ms(可通过spark.streaming.blockInterval修改)生成一个Block对象并推送到队列中,在StreamingContext中指定的时间片就是一个RDD的时长,因此每个RDD含有多少分区,只要计算一下是200ms的多少倍,然后就可以确定RDD内含有多少个分区了,但如果没有产生数据,就就不会生成分区,因此分区数不会超过这个倍数。内部结果如图所示:

    spark_036

    4.2 分区数控制

    修改块生成间隔即可改变分区数,代码如下:

    //块间隔设置,org.apache.spark.streaming.receiver.BlockGenerator#103
    conf.set("spark.streaming.blockInterval", "500ms")
    

    4.3 限速处理

    • 每秒接收记录数

      Spark Streaming可以控制每个Receiver每秒接收消息条数的上限,默认没有设置,就没有上限。该种方式缺点可能对集群处理能力估计不足,导致计算资源浪费。

      //每秒最多接受20条记录
      conf.set("spark.streaming.receiver.maxRate" , "20")
      

    • 压后(backpress)处理

      可以上spark Streaming基于当前batch的调度延迟与处理时间来控制接收速率,以备让系统只接受系统能够处理的速率。可以通过spark.streaming.backpressure.enabled属性开启,默认该属性是禁用的。对于Receiver的第一个批次的速率限制通过spark.streaming.backpressure.initialRate进行设置。启用压后处理属性后,在Spark Streaming内部,会动态设置Receiver接收速率的最大值。如果设置了spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerP-artition属性,则速率不会超过这一设置。具体配置方式如下:

      //启用压后控制
      conf.set("spark.streaming.backpressure.enabled" , "true")
      
      //设置第一个batch的接收速率
      conf.set("spark.streaming.backpressure.initialRate" , "10000")
      

    5、window操作

    5.1 介绍

    窗口是若干RDD的集合,窗口的长度必须是批次的整倍数,窗口的滑动间隔也必须是批次整倍数。比如每分钟查询最近一小时内的百度热词,一分钟就是窗口的滑动间隔,一小时就是窗口长度。

    spark_033

    spark_032

    5.2 编程实现

    val result = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a + b, Seconds(10) , Seconds(3))
    

    6、updateStateByKey

    按key更新状态是spark streaming对k-v类型的DStream提供的操作,是对每个K关联一个状态对象,可以是任何对象,该状态对象会随着DStream的流动,从上一个的RDD流向到下一个RDD,工作流程如下图所示:

    spark_034

    该函数需要传递一个高阶函数,方法签名如下:

    def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
      updateStateByKey(updateFunc, defaultPartitioner())
    }
    

    函数有两个参数,Seq[V]是本次RDD中K下的V值列表,Option[S]就跟K关联的状态对象,该函数返回状态对象Option[S],使用Option作为状态类型,意味着状态对象可能不存在。

    例如,从应用启动开始,每个单词出现的次数,则使用如下代码实现:

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 
      */
    object WordCountStreamingUpdateByKeyScala {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("streaming")
        conf.setMaster("local[8]")
        //创建SparkStreamingContext
        val sc = new StreamingContext(conf , Seconds(2)) ;
        
        /******************************************************
         *                                                    *
         *           注意:此处需要设置检查点目录,存放rdd数据       *
         *                                                    *
         ******************************************************/
        sc.checkpoint("file:///H:\spark\streaming")
    
        //行流
        val lines = sc.socketTextStream("s101" , 8888)
    
        //单词流
        val words = lines.flatMap(_.split(" "))
    
        //对流
        val pairs = words.map((_, 1))
    
        //窗口化操作.
        val result = pairs.reduceByKey((a:Int,b:Int)=>a + b)
    
        //
        def updateFunc(a:Seq[Int] , b:Option[Int]) = {
          if(b.isEmpty){
            if(a.isEmpty){
              Some(0)
            }
            else{
              Some(a.sum)
            }
          }
          else{
            val old = b.get
            if(a.isEmpty){
              Some(old)
            }
            else{
              Some(old + a.sum)
            }
          }
        }
        val ds = result.updateStateByKey(updateFunc)
        ds.print()
        sc.start()
        sc.awaitTermination()
      }
    }
    
    

    该带码执行的结果如下:

    spark_035

    7、避免大量小文件

    spark Streaming提供的saveAsTextFile方法是将每个RDD的每个分区输出到一个文件中,由于时间片通常是几秒,因此导致产生大量的小文件,进而影响Namenode的资源以及计算时导致大量的task出现。解决办法就是使用DStream的foreachRDD手动遍历每个分区,按照自定义法则将多个分区数据写入一个文件中,以下就是将多个RDD中相同分区索引的数据写入一个文件中,文件以主机名-精确化分的时间串-分区索引格式进行命令,代码实现如下:

    result.foreachRDD(rdd=>{
      rdd.mapPartitionsWithIndex((idx,it)=>{
        //当前时间
        val now = new java.util.Date
        //格式化
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH-mm")
        //提取时间串
        val minStr = sdf.format(now)
        
        //主机名
        val host = InetAddress.getLocalHost.getHostName
        
        //
        val file = new File("H:\spark\streaming", 
                            host + "-" + minStr + "-" + idx+ ".txt")
        
        //文件输出流
        val out = new FileOutputStream(file, true)
        
        //写入整个分区数据
        for (e <- it) {
          out.write(e.toString().getBytes)
          out.flush()
        }
        out.close()
        it
      }).count()
    })
    

    8、Spark Streaming同Spark SQL集成

    spark streaming中使用SQL,只需要创建Spark Session对象,将RDD转换成DataFrame即可,然后注册DataFrame成为临时视图后,就可以使用Spark SQL了,具体代码如下:

    import java.io.{File, FileOutputStream}
    import java.net.InetAddress
    import java.text.SimpleDateFormat
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      *
      */
    object WordCountStreamingSQLScala {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("streaming")
        conf.setMaster("local[8]")
        //创建SparkStreamingContext
        val sc = new StreamingContext(conf , Seconds(2)) ;
        sc.checkpoint("file:///H:\spark\streaming")
    
        //行流
        val lines = sc.socketTextStream("127.0.0.1" , 8888)
        
        lines.foreachRDD(rdd=>{
          val rdd1 = rdd.flatMap(_.split(" "))
          val rdd2 = rdd1.map((_,1))
          
          //创建SparkSession对象,使用RDD的配置即可
          val sess = SparkSession.builder()
            .config(rdd.sparkContext.getConf)
            .getOrCreate()
          
          //导入Spark Session隐式转换
          import sess.implicits._
          
          //转换RDD成为DataFrame,并注册成临时表
          rdd2.toDF("word" ,"cnt").createOrReplaceTempView("_wc")
          
          //执行sql操作
          sess.sql("select word , count(cnt) from _wc group by word").show(1000,false)
        })
    
        sc.start()
        sc.awaitTermination()
      }
    }
    

    注意:

    sc.socketTextStream("127.0.0.1" , 8888)方法如果是本地的socket,需要使用127.0.0.1地址,使用localhost不好使,bug!!!!!

    9、程序的部署

    运行spark Streaming应用不能在spark-shell下执行,需要使用spark-submit命令提交执行。需要如下内容:

    • cluster manager

      通过--master指定master URL地址。

      $>spark-submit --master spark://s101:7077 ...
      
    • 导出jar包

      如果应用包含第三方组件,比如kafka,需要将所有的第三方类库导出到jar包中,spark自身的和Spark Streaming的包则不必。

    • 给executor配置足够的内存

      由于接收到的数据必须存在内存中,executor必须提供足够的内存来存储他们。如果又启用了window操作,最少要配置window长度容纳的数量。

      $>spark-submit --executor-memory 2g ...
      

    • 配置检查点

      如果应用中用到了检查点,必须使用hadoop兼容的具有容错存储能力的检查点目录, 比如hdfs或S3。流计算应用会在里面写入检查点信息供故障恢复。

      val sc = new StreamingContext(conf , Seconds(2)) ;
      //配置检查点目录
      sc.checkpoint("file:///H:\spark\streaming")
      

    • 配置Driver的自动重启

      若需要Driver故障时自动恢复,那么用来运行流计算应用的部署命令必须能够监控driver进程并在他故障时重启。不同的集群管理有着不同的工具可以实现这一点:

      • spark standalone

        standalone cluster模式支持应用程序非零退出后的自动重启,若要使用这一特性,可以在spark-submit命中增加--supervise参数来获得。如下所示:

        $>spark-submit --supervise --master spark://s101:7077 --class MyApp myapp.jar
        

        如果要杀死落入重复失败状况下的应用,可以执行以下命令:

        $>spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
        
      • YARN

    • 配置写前日志

      从spark1.2开始,引入了写前日志来获得强容错性保证。如果启用的话,接收到的所有数据都会写入到检查点设置的写前日志中。这可以防止driver恢复时数据丢失,由此可以确保流数据零流失。可用通过spark.streaming.receiver.writeAhea-dLog.enable设置为true来启用写前日志,但会导致每个Receiver接受吞吐量的降低,可以通过增加Receiver进行补偿。

      此外,如果开启了写前日志,可以禁用spark对接受的数据副本化存储,因为写前日志已经存储在了副本模式的文件系统中了。可以通过设置InputStream的持久化模式为StorageLevel.MEMORY_AND_DISK_SER来完成。代码如下:

      //启用写前日志
      conf.set("spark.streaming.receiver.writeAheadLog.enable" , "true")
      ...
      val lines = sc.socketTextStream("s101" , 8888)
      //只有一个副本
      lines.persist(StorageLevel.MEMORY_AND_DISK) ;
      
    • 设置最大接受速率

      资源不足时可以进行限速处理,Receiver类型可以通过spark.streaming.receiver.maxRate设置,kafka方式可以通过 spark.streaming.kafka.maxRatePerPartition设置。Spark1.5之后引入了压后机制,不再需要限速设置,Spark Steaming自动找出速率限制并进行动态调增。压后控制可以通过spark.streaming.backpressure.enabled=true开启。

      val conf = new SparkConf
      conf.set("spark.streaming.backpressure.enabled" , "true")
      

    10、RDD的缓存管理

    rdd执行结果可以进行缓存起来,以备后面使用rdd时不需要重复计算,直接提取计算结果即可。设置rdd缓存之后,必须unpersist之后才能继续再重新设置缓存级别。rdd可以缓存结果到内存中或磁盘中,如果是磁盘级别,保存数据到临时目录下,临时目录可以通过spark.local.dir进行修改。

    // 缓存RDD
    rdd.cache()
    rdd.persist()
    // 内存中缓存
    rdd.persist(StorageLevel.MEMORY_ONLY)
    

    RDD的缓存级别:

    // 存储级别
    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1)
    
    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    

    11、RDD的检查点

    调用rdd.checkpoint()方法时,会将rdd结果保存到检查点目录,检查点目录通过sc.setCheckPointDir()设置。

    // 
    sc.setCheckpointDir("file:///H:/chk")
    

    12、修改spark的本地临时目录

    conf.set("spark.local.dir" ,"file:///H:/tmp" );
    
  • 相关阅读:
    问题
    cas restful接口实现SSO
    变量,数据类型
    注释
    下载,配置环境变量,第一个demo
    数据挖掘概念与技术(韩家伟)阅读笔记1
    pattern与matcherr
    Array.sort排序
    linux下C语言中的flock函数用法 【转】
    我为什么要进国企----HP大中华区总裁孙振耀退休感言
  • 原文地址:https://www.cnblogs.com/xupccc/p/9544621.html
Copyright © 2011-2022 走看看