zoukankan      html  css  js  c++  java
  • java streaming转换算子window,countByWindow

    spark streaming 代码

    package streaming;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/15 9:59
     **/
    public class sparkstreaming_test {
        public static void main(String[] args) throws Exception{
            SparkConf conf = new SparkConf().setAppName("spark streaming test").setMaster("local[*]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
            JavaReceiverInputDStream<String> line = ssc.socketTextStream("localhost", 9999);
            // 参数一:窗口的长度;参数二:窗口滑动的间隔
            line.window(Durations.seconds(8), Durations.seconds(4)).print();
            line.countByWindow(Durations.seconds(8),Durations.seconds(4)).print();
         
    
            ssc.start();
            ssc.awaitTermination();
    
        }
    }

    socket的代码

    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class socket_test {
        static ServerSocket serverSocket = null;
        static PrintWriter pw = null;
    
        public static void main(String[] args) {
            try {
                serverSocket = new ServerSocket(9999);
                System.out.println("服务启动,等待连接");
                Socket socket = serverSocket.accept();
                System.out.println("连接成功,来自:" + socket.getRemoteSocketAddress());
                pw = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
                int j = 0;
                while (j < 100) {
                    j++;
                    String str = "spark streaming test " + j;
                    pw.println(str);
                    pw.flush();
                    System.out.println(str);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    pw.close();
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    先执行socket代码,此时在等待状态,然后执行spark streaming代码。

    打印结果如下:

    0~4s

    spark streaming test 1
    spark streaming test 2
    spark streaming test 3

    4~8s

    spark streaming test 1
    spark streaming test 2
    spark streaming test 3
    spark streaming test 4
    spark streaming test 5
    spark streaming test 6
    spark streaming test 7

    8~12s

    spark streaming test 4
    spark streaming test 5
    spark streaming test 6
    spark streaming test 7
    spark streaming test 8
    spark streaming test 9
    spark streaming test 10
    spark streaming test 11

    12~16s

    spark streaming test 8
    spark streaming test 9
    spark streaming test 10
    spark streaming test 11
    spark streaming test 12
    spark streaming test 13
    spark streaming test 14
    spark streaming test 15

    打印的长度对应的就是window的第一个参数(一秒一条数据刚好是8),两次打印的差值刚好对应的第二个参数(4秒间隔,刚好4条数据)

  • 相关阅读:
    [leetcode] Weekly Contest 170 Summary
    [Scala] java使用scala的jar包问题:Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Short
    [git] Updates were rejected because the tip of your current branch is behind its remote counterpart.
    [Android] Installation failed due to: ''pm install-create -r -t -S 4590498' returns error 'UNSUPPORTED''
    CTF 常见操作总结
    记项目管理大作业Web项目Mandrian的全流程[其一] 整体分析: 功能划分, 组织结构
    [leetcode] 题解记录 11-20
    [leetcode] 题解记录 1-10
    记一次配置阿里云ECS GPU计算型gn5实例
    shell脚本 入门 —— 符号篇
  • 原文地址:https://www.cnblogs.com/7749ha/p/12893506.html
Copyright © 2011-2022 走看看