zoukankan      html  css  js  c++  java
  • 一篇文章学会sparkstreaming

    版权申明:转载请注明出处。
    文章来源:bigdataer.net

    1.什么是spark-streaming?

    实际生产中会有许多应用到实时处理的场景,比如:实时监测页面点击,实时监测系统异常,实时监测来自于外部的攻击。针对这些场景,twitter研发了实时数据处理工具storm,并在后来开源。spark针对这些场景设计了spark-streaming实时计算模型,它允许用户使用一系列批处理的API去处理实时数据,能做到代码逻辑的重复使用。
    和spark中的rdd非常相似,spark-streaming中使用离散化流(discretized stream)作为抽象的表示,叫做DStream。它是随时间推移而收集数据的序列,每个时间段收集到的数据在DStream内部以一个RDD的形式存在。DStream支持从kafka,flume,hdfs,s3等获取输入。DStream也支持两种操作,即转化操作和输出操作(区别于RDD中的行动操作)。转化操作又分为无状态的转化操作和有状态的转化操作,无状态的转化操作有map,filter,flatmap,repartition等,是针对单个时间区间内的操作。而有状态的转化操作可以针对不同的时间区间,后面详述。

    2.两个简单的例子

    2.1 监听socket获取数据,代码如下:
    这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息

    scala    17行

    object SocketStream {
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
        //接收消息
        val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    2.2 从kafka读取数据,比较常用

    scala    31行

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    3.再来谈架构

    通过上面两个例子,你可能对spark-streaming有了初步的了解,我们再来看一下它的架构。
    Spark-streaming使用"微批次"的架构,把流式计算当做一系列微型的批处理操作来对待,每个时间段都产生一个RDD。如图:
    wpc
    作用于一个DStream上的无状态转化操作会对它其中的每个RDD生效,如针对一个输入为语句的DStream做flatMap操作的示意图如下:
    shiyitu

    4.转化操作

    4.1 无状态的转化操作。
    无状态转化操作就是简单的将转化作用于DStream的每个RDD上面。下面列举了一些常见的转化操作,其中最后一个transform表示可以试用自定义的转化函数,尽管它前面已经提供了很多现成的API。
    wzt
    4.2有状态的转化操作。
    有状态的转化操作是跨时间段的数据操作,一些先前的批次也被用来在新的批次中做计算。主要有滑动窗口和updateStateByKey。前者以一个时间段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化。有状态的转化操作需要打开检查点机制来保证容错性。即:给ssc.checkpoint()设置一个检查点目录。
    (1)基于窗口的转化操作会在一个比ssc设置的更长的时间段内,通过整合多个批次的,计算出整个大的时间窗口的结果。基于窗口的操作需要两个参数,一个是窗口时长,一个是滑动步长。这两个参数是ssc设置的时长的整数倍。下面的图表示了一个时间窗口为3,滑动步长为2的窗口转化操作。
    window
    前面提到的监测关键字error的例子,现在需要每隔20s就对前面30s有error的日志记录做计数,代码如下:

    scala    34行

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER)
          .map(_._2)
        //每隔20s对前30s出现error的日志做计数
        val errors = dstream.window(Seconds(30),Seconds(20))
            .filter(_.contains("error"))
            .count()
        errors.foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    (2)updateStateByKey
    updateStateByKey能对键值对的数据进行不同批次间的数据计算,使用updateStateByKey,需要传入一个update函数,这个函数接收某个key最新批次对应的values,以及该key之前对应的value,按照自定义的逻辑返回一个新的value。如需要计算一个实时日志中http响应码的计数,代码如下:

    scala    39行

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //输出目录
        val output = args(0)
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
        val rdd = dstream.map(_.split("\001"))
          .map(x=>(x(0),x(1).toLong))
          .updateStateByKey(update)
        //输出
        rdd.foreachRDD(_.saveAsTextFile(output))
        ssc.start()
        ssc.awaitTermination()
      }
      //update函数
      def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={
        val current_num = new_values.size
        val result_num = current_num + old_value.getOrElse(0L)
        Some(result_num)
      }
    }
    

    (3)所有有状态转化操作
    state

    5.输出操作

    输出操作比较简单,有以下几种:
    save

    6.作业稳定性

    spark-streaming作业一般都要全天候不间断运行,那么作业的稳定性如何保证?主要有以下几点:
    6.1 检查点机制。
    其原理就是阶段性的将作业运行的数据存放到存储系统,如hdfs,s3等。当作业运行出现异常时可以从上述数据中恢复。
    6.2 驱动器容错。
    在创建实时计算作业的上下文时使用getOrCreate函数。代码如下:

    scala    7行

    	val ssc = StreamingContext.getOrCreate(cp_dir,createContext )
        def createContext(): StreamingContext  ={
          val sc = new SparkContext(conf)
          val ssc = new StreamingContext(sc,Seconds(10))
          ssc.checkpoint(cp_dir)
        }
    

    更多文章请关注微信公众号:bigdataer
    wx

  • 相关阅读:
    搭建非域AlwaysOn win2016+SQL2016
    从0开始搭建SQL Server AlwaysOn 第四篇(配置异地机房节点)
    从0开始搭建SQL Server AlwaysOn 第二篇(配置故障转移集群)
    从0开始搭建SQL Server AlwaysOn 第三篇(配置AlwaysOn)
    从0开始搭建SQL Server AlwaysOn 第一篇(配置域控)
    四、基于Windows 2012配置SQL Server 2014 AlwaysOn
    三、安装SQLserver 2014(For AlwaysOn)
    二、 Windows 2012配置故障转移(For SQLServer 2014 AlwaysOn)
    Mybatis-SQL语句构建器类及日志
    Mybatis-JavaAPI
  • 原文地址:https://www.cnblogs.com/bigdataer/p/6531963.html
Copyright © 2011-2022 走看看