zoukankan      html  css  js  c++  java
  • java streaming转换算子window,countByWindow

    spark streaming 代码

    package streaming;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/15 9:59
     **/
    public class sparkstreaming_test {
        public static void main(String[] args) throws Exception{
            SparkConf conf = new SparkConf().setAppName("spark streaming test").setMaster("local[*]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
            JavaReceiverInputDStream<String> line = ssc.socketTextStream("localhost", 9999);
            // 参数一:窗口的长度;参数二:窗口滑动的间隔
            line.window(Durations.seconds(8), Durations.seconds(4)).print();
            line.countByWindow(Durations.seconds(8),Durations.seconds(4)).print();
         
    
            ssc.start();
            ssc.awaitTermination();
    
        }
    }

    socket的代码

    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class socket_test {
        static ServerSocket serverSocket = null;
        static PrintWriter pw = null;
    
        public static void main(String[] args) {
            try {
                serverSocket = new ServerSocket(9999);
                System.out.println("服务启动,等待连接");
                Socket socket = serverSocket.accept();
                System.out.println("连接成功,来自:" + socket.getRemoteSocketAddress());
                pw = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
                int j = 0;
                while (j < 100) {
                    j++;
                    String str = "spark streaming test " + j;
                    pw.println(str);
                    pw.flush();
                    System.out.println(str);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    pw.close();
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    先执行socket代码,此时在等待状态,然后执行spark streaming代码。

    打印结果如下:

    0~4s

    spark streaming test 1
    spark streaming test 2
    spark streaming test 3

    4~8s

    spark streaming test 1
    spark streaming test 2
    spark streaming test 3
    spark streaming test 4
    spark streaming test 5
    spark streaming test 6
    spark streaming test 7

    8~12s

    spark streaming test 4
    spark streaming test 5
    spark streaming test 6
    spark streaming test 7
    spark streaming test 8
    spark streaming test 9
    spark streaming test 10
    spark streaming test 11

    12~16s

    spark streaming test 8
    spark streaming test 9
    spark streaming test 10
    spark streaming test 11
    spark streaming test 12
    spark streaming test 13
    spark streaming test 14
    spark streaming test 15

    打印的长度对应的就是window的第一个参数(一秒一条数据刚好是8),两次打印的差值刚好对应的第二个参数(4秒间隔,刚好4条数据)

  • 相关阅读:
    PNG文件格式具体解释
    opencv2对读书笔记——使用均值漂移算法查找物体
    Jackson的Json转换
    Java实现 蓝桥杯VIP 算法训练 装箱问题
    Java实现 蓝桥杯VIP 算法训练 装箱问题
    Java实现 蓝桥杯VIP 算法训练 单词接龙
    Java实现 蓝桥杯VIP 算法训练 单词接龙
    Java实现 蓝桥杯VIP 算法训练 方格取数
    Java实现 蓝桥杯VIP 算法训练 方格取数
    Java实现 蓝桥杯VIP 算法训练 单词接龙
  • 原文地址:https://www.cnblogs.com/7749ha/p/12893506.html
Copyright © 2011-2022 走看看