zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

    官网文档:《http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example》

    Spark Streaming提供的提供的理念是一个批次处理一定时间段内的数据,一批次处理接收到的这一批次的数据;而Structured Streaming提供的理念是使用DataFrame/DataSet方式接收流,这样的流是一个可以看做为一个无界的大表,可以持续输出统计结果,而统计结果也会跟随时间(流数据的流入)持续更新。

    另外,Spark Streaming是core Spark API的一个扩展,使用时需要引入maven:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>

    而Structured Streaming流处理是基于Spark SQL引擎,使用时要引入maven:

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.1</version>
            </dependency>

    Spark Streaming官网的例子reduceByKeyAndWindow

    简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理。

    import java.util.Arrays;                
                    
    import org.apache.spark.SparkConf;                
    import org.apache.spark.api.java.JavaSparkContext;                
    import org.apache.spark.streaming.Durations;                
    import org.apache.spark.streaming.api.java.JavaDStream;                
    import org.apache.spark.streaming.api.java.JavaPairDStream;                
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;                
    import org.apache.spark.streaming.api.java.JavaStreamingContext;                
                    
    import scala.Tuple2;                
                    
    public class HelloWord {                
        public static void main(String[] args) throws InterruptedException {            
            // Create a local StreamingContext with two working thread and batch interval of 1 second        
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");        
            JavaSparkContext jsc=new JavaSparkContext(conf);        
            jsc.setLogLevel("WARN");        
            JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(1));        
                    
            // Create a DStream that will connect to hostname:port, like localhost:9999        
            JavaReceiverInputDStream<String> lines = jssc.socketTextStream("xx.xx.xx.xx", 19999);        
                    
            // Split each line into words        
            JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());        
                    
            // Count each word in each batch        
            JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));        
                    
            // Reduce last 60 seconds of data, every 30 seconds        
            JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2,         
                    Durations.seconds(60), 
                    Durations.seconds(30));
                    
            // Print the first ten elements of each RDD generated in this DStream to the console        
            windowedWordCounts.print();        
                    
            jssc.start(); // Start the computation        
            jssc.awaitTermination(); // Wait for the computation to terminate        
        }
    }

    输入数据:

    窗口中数据随着时间的变化:

    实际工作中上边的代码统计出的结果:

    Window操作解读:

            // Reduce last 60 seconds of data, every 30 seconds        
            JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2,         
                    Durations.seconds(60), 
                    Durations.seconds(30));

    上边代码的意义就是:按照key对value进行求count,数据处理范围是60s内的数据,每隔30s统计一次。

    Spark Streaming提供了窗口计算,它允许你对滑动窗口上的数据使用变换(transformations)。下图说明了滑动窗口:

    上图介绍了,两个信息:

    1)original DStream:Spark Streaming是把一段时间接收到的流作为一个批数据“也就是图中上边绿色框框示意内容”;

    2)windowed DStream:窗口每次滑动就是把“滑动长度(时间)”内的数据(一批)合并到一起进行一次运算,另外"'滑动长度(时间)'内的数据"受两个因素影响:“窗口时长”、“水印时长”(Structured Streaming编程中拥有的)。

    上边的例子及图可以充分解释为什么每次窗口触发时参与计算的数据受“窗口时长”的影响。“窗口时长”实际上就是定义每次窗口事件触发时,参与计算的数据长度(范围)。

    记录:

    这篇文章写的不错,而且例子也不错:https://blog.csdn.net/l_15156024189/article/details/81612860

  • 相关阅读:
    SQLMAP注入教程-11种常见SQLMAP使用方法详解
    VS2012/2013/2015/Visual Studio 2017 关闭单击文件进行预览的功能
    解决 IIS 反向代理ARR URLREWRITE 设置后,不能跨域跳转 return Redirect 问题
    Spring Data JPA one to one 共享主键关联
    JHipster 问题集中
    Spring Data JPA 定义超类
    Spring Data JPA查询关联数据
    maven命名
    maven仓库
    Jackson读取列表
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9452489.html
Copyright © 2011-2022 走看看