zoukankan      html  css  js  c++  java
  • 070 DStream中的transform和foreachRDD函数

    1.说明
      DStream的API不够满足使用的时候,可以使用这两个函数,将dstream转换为rdd,然后进行操作

    2.transform

      transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可

    3.程序

     1 package com.window.it
     2 import org.apache.spark.{SparkConf, SparkContext}
     3 import org.apache.spark.storage.StorageLevel
     4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
     5 import org.apache.spark.streaming.dstream.DStream
     6 import org.apache.spark.streaming.kafka.KafkaUtils
     7 object TransformDemo {
     8   def main(args: Array[String]): Unit = {
     9     val conf = new SparkConf()
    10       .setAppName("StreamingWindowOfKafka")
    11       .setMaster("local[*]")
    12     val sc = SparkContext.getOrCreate(conf)
    13     val ssc = new StreamingContext(sc, Seconds(5))
    14     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
    15     // 路径对应的文件夹不能存在
    16     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712")
    17 
    18     val kafkaParams = Map(
    19       "group.id" -> "streaming-kafka-78912151",
    20       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
    21       "auto.offset.reset" -> "smallest"
    22     )
    23     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
    24     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
    25       ssc, // 给定SparkStreaming上下文
    26       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
    27       topics, // 给定读取对应topic的名称以及读取数据的线程数量
    28       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
    29     ).map(_._2)
    30 
    31     val resultWordCount = dstream
    32       .filter(line => line.nonEmpty)
    33       .flatMap(line => line.split(" ").map((_, 1)))
    34       .reduceByKeyAndWindow(
    35         (a: Int, b: Int) => a + b,
    36         Seconds(15), // 窗口大小
    37         Seconds(10) // 滑动大小
    38       )
    39     resultWordCount.print() // 这个也是打印数据
    40 
    41     /**
    42       * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
    43       */
    44     dstream.transform(rdd => {
    45       // 对rdd进行预处理
    46       val processedRDD = rdd
    47         .filter(line => line.nonEmpty)
    48         .flatMap(line => line.split(" ").map((_, 1)))
    49         .reduceByKey(_ + _)
    50       // 数据抽样,获取两个节点
    51       val seeder = processedRDD.takeSample(true, 2)
    52       // 对rdd进行处理操作, 将抽样数据和rdd中的数据进行比较,如果rdd中的word的出现次数大于等于抽样数据中的任何一个word的次数,次数*3;否则次数*2
    53       val brocast = rdd.sparkContext.broadcast(seeder)
    54       val resultRDD = processedRDD.mapPartitions(iter => {
    55         val seederValue = brocast.value
    56         iter.map {
    57           case (word, count) => {
    58             val vc = seederValue
    59               .filter(tuple => {
    60                 count >= tuple._2
    61               }).size
    62             if (vc == 0) {
    63               (word, 2, count * 2)
    64             } else {
    65               (word, 3, count * 3)
    66             }
    67           }
    68         }
    69       })
    70       resultRDD
    71     }).print()
    72     
    73     // 启动开始处理
    74     ssc.start()
    75     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
    76   }
    77 }

    4.foreachRDD

      作用和transform类型,将DStream的操作转换为RDD进行操作,区别:该api没有返回值

    5.程序

     1 package com.window.it
     2 
     3 import org.apache.spark.storage.StorageLevel
     4 import org.apache.spark.streaming.kafka.KafkaUtils
     5 import org.apache.spark.streaming.{Seconds, StreamingContext}
     6 import org.apache.spark.{SparkConf, SparkContext}
     7 
     8 object TransformDemo {
     9   def main(args: Array[String]): Unit = {
    10     val conf = new SparkConf()
    11       .setAppName("StreamingWindowOfKafka")
    12       .setMaster("local[*]")
    13     val sc = SparkContext.getOrCreate(conf)
    14     val ssc = new StreamingContext(sc, Seconds(5))
    15     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
    16     // 路径对应的文件夹不能存在
    17     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712")
    18 
    19     val kafkaParams = Map(
    20       "group.id" -> "streaming-kafka-78912151",
    21       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
    22       "auto.offset.reset" -> "smallest"
    23     )
    24     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
    25     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
    26       ssc, // 给定SparkStreaming上下文
    27       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
    28       topics, // 给定读取对应topic的名称以及读取数据的线程数量
    29       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
    30     ).map(_._2)
    31 
    32     val resultWordCount = dstream
    33       .filter(line => line.nonEmpty)
    34       .flatMap(line => line.split(" ").map((_, 1)))
    35       .reduceByKeyAndWindow(
    36         (a: Int, b: Int) => a + b,
    37         Seconds(15), // 窗口大小
    38         Seconds(10) // 滑动大小
    39       )
    40     resultWordCount.print() // 这个也是打印数据
    41 
    42     dstream.foreachRDD(rdd => {
    43       // TODO: 这里就可以做数据输出的代码编写
    44       // TODO: 这里不要为空
    45       rdd.foreachPartition(iter => {
    46         // TODO: 这里在实际环境中不要为空,为空可能会出现一些问题:内存泄露的问题
    47         println(iter.take(1))
    48       })
    49     })
    50     
    51     // 启动开始处理
    52     ssc.start()
    53     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
    54   }
    55 }

    6.注意点

      一个批次,DStream内部就只对应一个RDD,transform和foreachRDD API使用的过程中,不要考虑多个RDD的问题

  • 相关阅读:
    记忆的永恒
    放弃我是你的错
    献给我逝去的长辈们清明
    思维的局限,穷人为什么会穷?
    借我一生
    陪你到老
    风雨路途
    人生的十二大财富
    怀才不遇
    javascript变量
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9490068.html
Copyright © 2011-2022 走看看