zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

    官网文档:《http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example》

    Spark Streaming提供的提供的理念是一个批次处理一定时间段内的数据,一批次处理接收到的这一批次的数据;而Structured Streaming提供的理念是使用DataFrame/DataSet方式接收流,这样的流是一个可以看做为一个无界的大表,可以持续输出统计结果,而统计结果也会跟随时间(流数据的流入)持续更新。

    另外,Spark Streaming是core Spark API的一个扩展,使用时需要引入maven:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>

    而Structured Streaming流处理是基于Spark SQL引擎,使用时要引入maven:

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.1</version>
            </dependency>

    Spark Streaming官网的例子reduceByKeyAndWindow

    简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理。

    import java.util.Arrays;                
                    
    import org.apache.spark.SparkConf;                
    import org.apache.spark.api.java.JavaSparkContext;                
    import org.apache.spark.streaming.Durations;                
    import org.apache.spark.streaming.api.java.JavaDStream;                
    import org.apache.spark.streaming.api.java.JavaPairDStream;                
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;                
    import org.apache.spark.streaming.api.java.JavaStreamingContext;                
                    
    import scala.Tuple2;                
                    
    public class HelloWord {                
        public static void main(String[] args) throws InterruptedException {            
            // Create a local StreamingContext with two working thread and batch interval of 1 second        
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");        
            JavaSparkContext jsc=new JavaSparkContext(conf);        
            jsc.setLogLevel("WARN");        
            JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(1));        
                    
            // Create a DStream that will connect to hostname:port, like localhost:9999        
            JavaReceiverInputDStream<String> lines = jssc.socketTextStream("xx.xx.xx.xx", 19999);        
                    
            // Split each line into words        
            JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());        
                    
            // Count each word in each batch        
            JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));        
                    
            // Reduce last 60 seconds of data, every 30 seconds        
            JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2,         
                    Durations.seconds(60), 
                    Durations.seconds(30));
                    
            // Print the first ten elements of each RDD generated in this DStream to the console        
            windowedWordCounts.print();        
                    
            jssc.start(); // Start the computation        
            jssc.awaitTermination(); // Wait for the computation to terminate        
        }
    }

    输入数据:

    窗口中数据随着时间的变化:

    实际工作中上边的代码统计出的结果:

    Window操作解读:

            // Reduce last 60 seconds of data, every 30 seconds        
            JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2,         
                    Durations.seconds(60), 
                    Durations.seconds(30));

    上边代码的意义就是:按照key对value进行求count,数据处理范围是60s内的数据,每隔30s统计一次。

    Spark Streaming提供了窗口计算,它允许你对滑动窗口上的数据使用变换(transformations)。下图说明了滑动窗口:

    上图介绍了,两个信息:

    1)original DStream:Spark Streaming是把一段时间接收到的流作为一个批数据“也就是图中上边绿色框框示意内容”;

    2)windowed DStream:窗口每次滑动就是把“滑动长度(时间)”内的数据(一批)合并到一起进行一次运算,另外"'滑动长度(时间)'内的数据"受两个因素影响:“窗口时长”、“水印时长”(Structured Streaming编程中拥有的)。

    上边的例子及图可以充分解释为什么每次窗口触发时参与计算的数据受“窗口时长”的影响。“窗口时长”实际上就是定义每次窗口事件触发时,参与计算的数据长度(范围)。

    记录:

    这篇文章写的不错,而且例子也不错:https://blog.csdn.net/l_15156024189/article/details/81612860

  • 相关阅读:
    账户经常被盗号怎么办?防盗“黑科技”了解一下
    京训钉自动播放,京训钉自动续播刷课时,京训钉自动关弹窗,自动下一课,倍速播放
    记账小程序
    “TensorFlow 开发者出道计划”全攻略,玩转社区看这里!
    maven的安装配置使用
    年轻就该多尝试,教你20小时Get一项新技能
    原生JS封装常用函数
    记账小程序
    java基础知识学习小总结(一)
    JavaSE集合类
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9452489.html
Copyright © 2011-2022 走看看