zoukankan      html  css  js  c++  java
  • Spark Streaming

    Spark Streaming 

    Spark Streaming 是Spark为了用户实现流式计算的模型。

    数据源包括Kafka,Flume,HDFS等。

    DStream 离散化流(discretized stream), Spark Streaming 使用DStream作为抽象表示。是随时间推移而收到的数据的序列。DStream内部的数据都是RDD形式存储, DStream是由这些RDD所组成的离散序列。

    编写Streaming步骤:

    1.创建StreamingContext

    // Create a local StreamingContext with two working thread and batch interval of 5 second.
    // The master requires 2 cores to prevent from a starvation scenario.
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))

    创建本地化StreamingContext, 需要至少2个工作线程。一个是receiver,一个是计算节点。

    2.定义输入源,创建输入DStream

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("node1", 9999)

    3.定义流的计算过程,使用transformation和output operation DStream

    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    4.开始接收数据及处理数据,使用streamingContext.start()

    ssc.start()             // Start the computation

    5.等待批处理被终止,使用streamingContext.awaitTermination()

    ssc.awaitTermination()  // Wait for the computation to terminate

    6.可以手工停止批处理,使用streamingContext.stop()

    数据源

    数据源分为两种

    1.基本源

    text,HDFS等

    2.高级源

    Flume,Kafka等

    DStream支持两种操作

    一、转化操作(transformation)

    无状态转化(stateless):每个批次的处理不依赖于之前批次的数据

    有状态转化(stateful):跨时间区间跟踪数据的操作;一些先前批次的数据被用来在新的批次中参与运算。

    • 滑动窗口:
    • 追踪状态变化:updateStateByKey()

    transform函数

    transform操作允许任意RDD-to-RDD函数被应用在一个DStream中.比如在DStream中的RDD可以和DStream外部的RDD进行Join操作。通常用来过滤垃圾邮件等操作。

    不属于DStream的RDD,在每个批间隔都被调用;允许你做随时间变化的RDD操作。RDD操作,partitions的数量,广播变量的值,可以变化在不同的批次中。

    例子:

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by Edward on 2016/9/16.
      */
    object TransformOperation {
    
      def main(args: Array[String]) {
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformOperation")
        val ssc = new StreamingContext(conf, Seconds(5))
        val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)
    
        //黑名单过滤功能,可以将数据存到redis或者数据库,每批次间隔都会重新取数据并参与运算,保证数据可以动态加载进来。
        val blackList=Array(Tuple2("Tom", true))
        val listRDD  = ssc.sparkContext.parallelize(blackList).persist() //创建RDD
    
        val map  = textStream.map(x=>(x.split(" ")(1),x))
        
        //通过transform将DStream中的RDD进行过滤操作
        val dStream = map.transform(rdd =>{
          //listRDD.collect()
          //println(listRDD.collect.length)
          
          //通过RDD的左链接及过滤函数,对数据进行处理,生成新的RDD
          rdd.leftOuterJoin(listRDD).filter(x =>{ //使用transform操作DStream中的rdd  rdd左链接listRDD, 并进行过滤操作
            if(!x._2._2.isEmpty && x._2._2.get)// if(x._2._2.getOrElse(false)) //如果没取到值则结果为false
              false
            else{
              true
            }
          })
        })
    
        dStream.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }

    窗口函数

     

    二、输出操作(output operation)

     

  • 相关阅读:
    【JMeter_22】JMeter逻辑控制器__录制控制器<Recording Controller>
    【JMeter_21】JMeter逻辑控制器__模块控制器<Module Controller>
    【JMeter_20】JMeter逻辑控制器__事务控制器<Transaction Controller>
    【JMeter_19】JMeter逻辑控制器__简单控制器<Simple Controller>
    【JMeter_18】JMeter逻辑控制器__吞吐量控制器<Throughput Controller>
    【JMeter_17】JMeter逻辑控制器__随机顺序控制器<Random Order Controller>
    【JMeter_16】JMeter逻辑控制器__随机控制器<Random Controller>
    【JMeter_15】JMeter逻辑控制器__仅一次控制器<Once Only Controller>
    Golang错误和异常处理的正确姿势
    用beego开发服务端应用
  • 原文地址:https://www.cnblogs.com/one--way/p/5877552.html
Copyright © 2011-2022 走看看