zoukankan      html  css  js  c++  java
  • 069 在SparkStreaming的窗口分析

    一:说明

    1.图例说明

      

      -------------------------------------------------------------------------------------------------------------------------------------------------------------------------

      

    2.对比说明

      DStream:
        batchInterval: 批次产生间隔时间
      Window DStream:
        windowInterval: 窗口间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)
        slideInterval:窗口滑动间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)

    3.API

      使用CTRL+F3,可以参考这篇文档的快捷键:https://blog.csdn.net/qq_36901488/article/details/80704245

      

    二:程序

    1.程序

     1 package com.window.it
     2 import org.apache.spark.{SparkConf, SparkContext}
     3 import org.apache.spark.storage.StorageLevel
     4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
     5 import org.apache.spark.streaming.dstream.DStream
     6 import org.apache.spark.streaming.kafka.KafkaUtils
     7 
     8 object ReduceWindow {
     9   def main(args: Array[String]): Unit = {
    10     val conf = new SparkConf()
    11       .setAppName("StreamingWindowOfKafka")
    12       .setMaster("local[*]")
    13     val sc = SparkContext.getOrCreate(conf)
    14     val ssc = new StreamingContext(sc, Seconds(5))
    15     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
    16     // 路径对应的文件夹不能存在
    17     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/452512")
    18 
    19     val kafkaParams = Map(
    20       "group.id" -> "streaming-kafka-78912151",
    21       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
    22       "auto.offset.reset" -> "smallest"
    23     )
    24     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
    25     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
    26       ssc, // 给定SparkStreaming上下文
    27       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
    28       topics, // 给定读取对应topic的名称以及读取数据的线程数量
    29       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
    30     ).map(_._2)
    31 
    32     val resultWordCount = dstream
    33       .filter(line => line.nonEmpty)
    34       .flatMap(line => line.split(" ").map((_, 1)))
    35       .reduceByKeyAndWindow(
    36         (a: Int, b: Int) => a + b,
    37         Seconds(15), // 窗口大小
    38         Seconds(10) // 滑动大小
    39       )
    40     resultWordCount.print() // 这个也是打印数据
    41     
    42     // 启动开始处理
    43     ssc.start()
    44     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
    45   }
    46 }

    2.效果

      这里主要看的是页面的DAG。

  • 相关阅读:
    linux下启动和关闭网卡命令及DHCP上网
    python 编码问题
    paddlepaddle
    Convolutional Neural Network Architectures for Matching Natural Language Sentences
    deep learning RNN
    Learning Structured Representation for Text Classification via Reinforcement Learning 学习笔记
    Python IO密集型任务、计算密集型任务,以及多线程、多进程
    EM 算法最好的解释
    tensorflow 调参过程
    tensorflow 学习纪录(持续更新)
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9484601.html
Copyright © 2011-2022 走看看