zoukankan      html  css  js  c++  java
  • Spark Streaming中的操作函数分析

      依据Spark官方文档中的描写叙述。在Spark Streaming应用中,一个DStream对象能够调用多种操作。主要分为以下几类

    • Transformations
    • Window Operations
    • Join Operations
    • Output Operations

    一、Transformations

    1、map(func)

      map操作须要传入一个函数当做參数,详细调用形式为

    val b = a.map(func)

      主要作用是,对DStream对象a,将func函数作用到a中的每个元素上并生成新的元素,得到的DStream对象b中包括这些新的元素。
      以下演示样例代码的作用是。在接收到的一行消息后面拼接一个”_NEW”字符串

    val linesNew = lines.map(lines => lines + "_NEW" )

      程序执行结果例如以下:
      这里写图片描写叙述
      注意与接下来的flatMap操作进行比較。

    2、flatMap(func)

      相似于上面的map操作,详细调用形式为

    val b = a.flatMap(func)

      主要作用是,对DStream对象a,将func函数作用到a中的每个元素上并生成0个或多个新的元素。得到的DStream对象b中包括这些新的元素。

      以下演示样例代码的作用是,在接收到的一行消息lines后,将lines依据空格进行切割,切割成若干个单词

    val words = lines.flatMap(_.split( " " ))

      结果例如以下:
      这里写图片描写叙述

    3、 filter(func)

      filter传入一个func函数,详细调用形式为

    val b = a.filter(func)

      对DStream a中的每个元素,应用func方法进行计算。假设func函数返回结果为true。则保留该元素,否则丢弃该元素,返回一个新的DStream b。

      以下演示样例代码中,对words进行推断。去除hello这个单词。

    val filterWords = words.filter(_ != "hello" )

      结果例如以下:
      这里写图片描写叙述

    4、union(otherStream)

      这个操作将两个DStream进行合并,生成一个包括着两个DStream中全部元素的新DStream对象。
      以下代码,首先将输入的每个单词后面分别拼接“_one”和“_two”。最后将这两个DStream合并成一个新的DStream

    val wordsOne = words.map(_ + "_one" )
    val wordsTwo = words.map(_ + "_two" )
    val unionWords = wordsOne.union(wordsTwo)
    
    wordsOne.print()
    wordsTwo.print()
    unionWords.print()

      执行结果例如以下:
      这里写图片描写叙述

    5、count()

      统计DStream中每个RDD包括的元素的个数。得到一个新的DStream,这个DStream中仅仅包括一个元素。这个元素是相应语句单词统计数值。
      以下代码。统计每一行中的单词数

    val wordsCount = words.count()

      执行结果例如以下,一行输入4个单词,打印的结果也为4。
      这里写图片描写叙述

    6、reduce(func)

      返回一个包括一个元素的DStream。传入的func方法会作用在调用者的每个元素上。将当中的元素顺次的两两进行计算。
      以下的代码,将每个单词用"-"符号进行拼接

    val reduceWords = words.reduce(_ + "-" + _)

      执行结果例如以下:
      这里写图片描写叙述

    7、countByValue()

      某个DStream中的元素类型为K,调用这种方法后,返回的DStream的元素为(K, Long)对,后面这个Long值是原DStream中每个RDD元素key出现的频率。
      以下代码统计words中不同单词的个数

    val countByValueWords = words.countByValue()

      结果例如以下:
      这里写图片描写叙述

    8、reduceByKey(func, [numTasks])

      调用这个操作的DStream是以(K, V)的形式出现,返回一个新的元素格式为(K, V)的DStream。返回结果中,K为原来的K,V是由K经过传入func计算得到的。还能够传入一个并行计算的參数,在local模式下。默觉得2。在其它模式下,默认值由參数spark.default.parallelism确定。
      以下代码将words转化成(word, 1)的形式,再以单词为key,个数为value。进行word count。

    val pairs = words.map(word => (word , 1))
    val wordCounts = pairs.reduceByKey(_ + _)

      结果例如以下。
      这里写图片描写叙述

    9、join(otherStream, [numTasks])

      由一个DStream对象调用该方法,元素内容为(k, V),传入还有一个DStream对象。元素内容为(k, W),返回的DStream中包括的内容是(k, (V, W))。这种方法也能够传入一个并行计算的參数,该參数与reduceByKey中是同样的。


      以下代码中,首先将words转化成(word, (word + "_one"))(word, (word + "_two"))的形式。再以word为key,将后面的value合并到一起。

    val wordsOne = words.map(word => (word , word + "_one" ))
    val wordsTwo = words.map(word => (word , word + "_two" ))
    val joinWords = wordsOne.join(wordsTwo)

      执行结果例如以下:
      这里写图片描写叙述

    10、cogroup(otherStream, [numTasks])

      由一个DStream对象调用该方法。元素内容为(k, V)。传入还有一个DStream对象,元素内容为(k, W)。返回的DStream中包括的内容是(k, (Seq[V], Seq[W]))。这种方法也能够传入一个并行计算的參数。该參数与reduceByKey中是同样的。
     以下代码首先将words转化成(word, (word + "_one"))(word, (word + "_two"))的形式,再以word为key,将后面的value合并到一起。
     结果例如以下:
     这里写图片描写叙述

    11、transform(func)

      在Spark-Streaming官方文档中提到,DStream的transform操作极大的丰富了DStream上能够进行的操作内容。使用transform操作后。除了能够使用DStream提供的一些转换方法之外。还能够直接调用随意的调用RDD上的操作函数。
      比方以下的代码中,使用transform完毕将一行语句切割成单词的功能。

    val words = lines.transform(rdd =>
          rdd.flatMap(_.split(" "))
        )

      执行结果例如以下:
      这里写图片描写叙述

    12、updateStateByKey(func)

    二、Window Operations

      我觉得用一个成语。管中窥豹,基本上就能够非常形象的解释什么是窗体函数了。DStream数据流就是那仅仅豹子,窗体就是那个管。以一个固定的速率平移,就能够每次看到豹的一部分。


      窗体函数,就是在DStream流上。以一个可配置的长度为窗体,以一个可配置的速率向前移动窗体,依据窗体函数的详细内容,分别对当前窗体中的这一波数据採取某个相应的操作算子。

    须要注意的是窗体长度,和窗体移动速率须要是batch time的整数倍。

    接下来演示Spark Streaming中提供的主要窗体函数。

    1、window(windowLength, slideInterval)

      该操作由一个DStream对象调用,传入一个窗体长度參数,一个窗体移动速率參数,然后将当前时刻当前长度窗体中的元素取出形成一个新的DStream。
      以下的代码以长度为3,移动速率为1截取源DStream中的元素形成新的DStream。

    val windowWords = words.window(Seconds( 3 ), Seconds( 1))

      执行结果例如以下:
      这里写图片描写叙述
      基本上每秒输入一个字母,然后取出当前时刻3秒这个长度中的全部元素,打印出来。从上面的截图中能够看到,下一秒时已经看不到a了,再下一秒,已经看不到b和c了。表示a, b, c已经不在当前的窗体中。

    2、 countByWindow(windowLength,slideInterval)

      返回指定长度窗体中的元素个数。
      代码例如以下。统计当前3秒长度的时间窗体的DStream中元素的个数:

    val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))

      结果例如以下:
      这里写图片描写叙述

    3、 reduceByWindow(func, windowLength,slideInterval)

      相似于上面的reduce操作,仅仅只是这里不再是对整个调用DStream进行reduce操作,而是在调用DStream上首先取窗体函数的元素形成新的DStream,然后在窗体元素形成的DStream上进行reduce。
      代码例如以下:

    val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))

      结果例如以下:
      这里写图片描写叙述

    4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

      调用该操作的DStream中的元素格式为(k, v),整个操作相似于前面的reduceByKey。仅仅只是相应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗体长度中的全部数据。该操作也有一个可选的并发数參数。
      以下代码中,将当前长度为3的时间窗体中的全部数据元素依据key进行合并。统计当前3秒中内不同单词出现的次数。

    val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))

      结果例如以下:
      这里写图片描写叙述

    5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

      这个窗体操作和上一个的差别是多传入一个函数invFunc。

    前面的func作用和上一个reduceByKeyAndWindow同样,后面的invFunc是用于处理流出rdd的。


      在以下这个样例中,假设把3秒的时间窗体当成一个池塘。池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。

    而第二个函数是,每由出去一条鱼,就将该鱼的总数减去一。

    val windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))

      以下是演示结果,终于的结果是该3秒长度的窗体中历史上出现过的全部不同单词个数都为0。


      这里写图片描写叙述
      一段时间不输入不论什么信息,看一下终于结果
      这里写图片描写叙述

    6、 countByValueAndWindow(windowLength,slideInterval, [numTasks])

      相似于前面的countByValue操作。调用该操作的DStream数据格式为(K, v)。返回的DStream格式为(K, Long)。

    统计当前时间窗体中元素值同样的元素的个数。
      代码例如以下

    val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))

      结果例如以下
      这里写图片描写叙述

    三、Join Operations

      Join主要可分为两种,

    1、DStream对象之间的Join

      这样的join一般应用于窗体函数形成的DStream对象之间,详细能够參考第一部分中的join操作,除了简单的join之外。还有leftOuterJoin, rightOuterJoin和fullOuterJoin。

    2、DStream和dataset之间的join

      这一种join,能够參考前面transform操作中的演示样例。

    四、Output Operations

      在Spark Streaming中,DStream的输出操作才是DStream上全部transformations的真正触发计算点。这个相似于RDD中的action操作。经过输出操作DStream中的数据才干与外部进行交互。比方将数据写入文件系统、数据库,或其它应用中。

      
      

    1、print()

      print操作会将DStream每个batch中的前10个元素在driver节点打印出来。
      看以下这个演示样例。一行输入超过10个单词,然后将这行语句切割成单个单词的DStream。

    val words = lines.flatMap(_.split(" "))
    words.print()

      看看print后的效果。


      这里写图片描写叙述
      

    2、saveAsTextFiles(prefix, [suffix])

      这个操作能够将DStream中的内容保存为text文件,每个batch的数据单独保存为一个文夹,目录名前缀參数必须传入,目录名后缀參数可选,终于目录名称的完整形式为prefix-TIME_IN_MS[.suffix]
      比方以下这一行代码

    lines.saveAsTextFiles("satf", ".txt")

      看一下执行结果。在当前项目路径下。每秒钟生成一个目录。打开的两个窗体中的内容各自是nc窗体中的输入。
      这里写图片描写叙述
      另外,假设前缀中包括文件完整路径。则该text目录会建在指定路径下,例如以下图所看到的
      这里写图片描写叙述
      

    3、saveAsObjectFiles(prefix, [suffix])

      这个操作和前面一个相似,仅仅只是这里将DStream中的内容保存为SequenceFile文件类型,这个文件里保存的数据都是经过序列化后的Java对象。
      实验略过。可參考前面一个操作。
      

    4、saveAsHadoopFiles(prefix, [suffix])

      这个操作和前两个相似,将DStream每一batch中的内容保存到HDFS上,同样能够指定文件的前缀和后缀。


      

    5、foreachRDD(func)

  • 相关阅读:
    jQuery 重新温习 遗忘知识点
    正则表达式获取博客园随笔1
    用django创建一个简单的sns
    WCF小实例以及三种宿主
    iOS: imageIO完成渐进加载图片
    Excel 菜单系统
    分布式EventBus的Socket实现
    Jenkins安装plugin
    邮件系统存储设计问答
    在Windows上使用CodeLite+MinGW+Clang进行开发
  • 原文地址:https://www.cnblogs.com/llguanli/p/8313235.html
Copyright © 2011-2022 走看看