zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。

    需求:

    目前kafka的topic上有一批数据,这些数据被分配到9个不同的partition中(就是发布时key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)这些数据的唯一键值:int_id+start_time,其中int_id和start_time是topic record中的记录。这9组数据按照唯一键值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.primarykey1)。

    伪代码:

    m组成字段包含:

    public class MS_PLRULQX {
        private String key;
        private String int_id;
        private String start_time;
        private long MS_PLRULQX_00;
        private long MS_PLRULQX_01;
        
        public String getPrimaryKey() {
          return this.int_id + "_" + this.scan_start_time;
        }
    }

    完整MS_PLRULQX类定义:

    import java.io.Serializable;
    import org.apache.spark.sql.Row;
    
    public class MS_PLRULQX implements Serializable, Comparable<MS_PLRULQX> {
        private static final long serialVersionUID = -2873721171908282946L;
    
        public MS_PLRULQX() {
        }
        
        public MS_PLRULQX(Row row) {
            this.key = row.getAs("key");
            this.int_id = row.getAs("int_id");
            this.start_time = row.getAs("start_time");
            this.MS_PLRULQX_00 = row.getAs("MS_PLRULQX_00");
            this.MS_PLRULQX_01 = row.getAs("MS_PLRULQX_01");
        }
    
        private String key;
        private String int_id;
        private String start_time;
        private long MS_PLRULQX_00;
        private long MS_PLRULQX_01;
        
        public String getKey() {
            return key;
        }
    
        public void setKey(String key) {
            this.key = key;
        }
    
        public String getInt_id() {
            return int_id;
        }
    
        public void setInt_id(String int_id) {
            this.int_id = int_id;
        }
    
        public String getStart_time() {
            return start_time;
        }
    
        public void setStart_time(String start_time) {
            this.start_time = start_time;
        }
    
        public long getMS_PLRULQX_00() {
            return MS_PLRULQX_00;
        }
    
        public void setMS_PLRULQX_00(long MS_PLRULQX_00) {
            this.MS_PLRULQX_00 = MS_PLRULQX_00;
        }
    
        public long getMS_PLRULQX_01() {
            return MS_PLRULQX_01;
        }
    
        public void setMS_PLRULQX_01(long MS_PLRULQX_01) {
            this.MS_PLRULQX_01 = MS_PLRULQX_01;
        }
    
        public String getPrimaryKey() {
            return this.int_id + "_" + this.scan_start_time;
        }
    
        @Override
        public int compareTo(MS_PLRULQX other) {
            // key format:MS_PLRULQX1,MS_PLRULQX2,..MS_PLRULQX9
            if (this.getKey().toLowerCase().indexOf("MS_PLRULQX".toLowerCase()) != -1) {
                NumberUtils numberUtils = new NumberUtils();
                String thisKeyStr = this.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), "");
                String otherKeyStr = other.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), "");
                if (numberUtils.isNumber(thisKeyStr)) {
                    int thisKeyValue = Integer.valueOf(thisKeyStr);
                    int otherKeyValue = Integer.valueOf(otherKeyStr);
                    if (thisKeyValue > otherKeyValue) {
                        return 1;
                    } else if (thisKeyStr == otherKeyStr) {
                        return 0;
                    } else {
                        return -1;
                    }
                }
            }
    
            return this.key.compareTo(other.key);
        }
    
    }
    View Code

    MS_PLRULQX在9个topic中各有一份,把它们拼接起来,拼接条件primarykey相同的数据才能一起拼接,拼接后保留实体字段如下:

    public class MS_PLRULQX_Combine implements Serializable {
        private String key;
        private String int_id;
        private String start_time;
    
        private long mr_packetlossrateulqci_1_00;
        private long mr_packetlossrateulqci_1_01;
    
        private long mr_packetlossrateulqci_2_00;
        private long mr_packetlossrateulqci_2_01;
    
        private long mr_packetlossrateulqci_3_00;
        private long mr_packetlossrateulqci_3_01;
    
        private long mr_packetlossrateulqci_4_00;
        private long mr_packetlossrateulqci_4_01;
    
        private long mr_packetlossrateulqci_5_00;
        private long mr_packetlossrateulqci_5_01;
    
        private long mr_packetlossrateulqci_6_00;
        private long mr_packetlossrateulqci_6_01;
    
        private long mr_packetlossrateulqci_7_00;
        private long mr_packetlossrateulqci_7_01;
    
        private long mr_packetlossrateulqci_8_00;
        private long mr_packetlossrateulqci_8_01;
        
        private long mr_packetlossrateulqci_9_00;
        private long mr_packetlossrateulqci_9_01;
    }

    完整MS_PLRULQX_Combine 类定义:

    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    
    public class MS_PLRULQX_Combine implements Serializable {
        private static final long serialVersionUID = -944128402186054489L;
    
        public MS_PLRULQX_Combine() {
        }
    
        public MS_PLRULQX_Combine(List<MS_PLRULQX> list) {
            int sizeOfList = list.size();
            if (sizeOfList > 9) {
                throw new RuntimeException("the measurement group items's length(" + list.size() + ") over than 9");
            }
    
            if (sizeOfList >= 1) {
                setItem1(list.get(0));
            }
            if (sizeOfList >= 2) {
                setItem2(list.get(1));
            }
            if (sizeOfList >= 3) {
                setItem3(list.get(2));
            }
            if (sizeOfList >= 4) {
                setItem4(list.get(3));
            }
            if (sizeOfList >= 5) {
                setItem5(list.get(4));
            }
            if (sizeOfList >= 6) {
                setItem6(list.get(5));
            }
            if (sizeOfList >= 7) {
                setItem7(list.get(6));
            }
            if (sizeOfList >= 8) {
                setItem8(list.get(7));
            }
            if (sizeOfList >= 9) {
                setItem9(list.get(8));
            }
        }
    
        private void setItem9(MS_PLRULQX item9) {
            if (item9 != null) {
                this.mr_packetlossrateulqci_9_00 = item9.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_9_01 = item9.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem8(MS_PLRULQX item8) {
            if (item8 != null) {
                this.mr_packetlossrateulqci_8_00 = item8.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_8_01 = item8.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem7(MS_PLRULQX item7) {
            if (item7 != null) {
                this.mr_packetlossrateulqci_7_00 = item7.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_7_01 = item7.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem6(MS_PLRULQX item6) {
            if (item6 != null) {
                this.mr_packetlossrateulqci_6_00 = item6.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_6_01 = item6.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem5(MS_PLRULQX item5) {
            if (item5 != null) {
                this.mr_packetlossrateulqci_5_00 = item5.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_5_01 = item5.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem4(MS_PLRULQX item4) {
            if (item4 != null) {
                this.mr_packetlossrateulqci_4_00 = item4.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_4_01 = item4.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem3(MS_PLRULQX item3) {
            if (item3 != null) {
                this.mr_packetlossrateulqci_3_00 = item3.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_3_01 = item3.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem2(MS_PLRULQX item2) {
            if (item2 != null) {
                this.mr_packetlossrateulqci_2_00 = item2.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_2_01 = item2.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem1(MS_PLRULQX item1) {
            if (item1 != null) {
                this.key = item1.getKey();
                this.int_id = item1.getInt_id();
                this.start_time = item1.getStart_time();
    
                this.mr_packetlossrateulqci_1_00 = item1.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_1_01 = item1.getMr_packetlossrateulqci_01();
            }
        }
    
        private String key;
        private String int_id;
        private String start_time;
    
        private long mr_packetlossrateulqci_1_00;
        private long mr_packetlossrateulqci_1_01;
    
        private long mr_packetlossrateulqci_2_00;
        private long mr_packetlossrateulqci_2_01;
    
        private long mr_packetlossrateulqci_3_00;
        private long mr_packetlossrateulqci_3_01;
    
        private long mr_packetlossrateulqci_4_00;
        private long mr_packetlossrateulqci_4_01;
    
        private long mr_packetlossrateulqci_5_00;
        private long mr_packetlossrateulqci_5_01;
    
        private long mr_packetlossrateulqci_6_00;
        private long mr_packetlossrateulqci_6_01;
    
        private long mr_packetlossrateulqci_7_00;
        private long mr_packetlossrateulqci_7_01;
    
        private long mr_packetlossrateulqci_8_00;
        private long mr_packetlossrateulqci_8_01;
        
        private long mr_packetlossrateulqci_9_00;
        private long mr_packetlossrateulqci_9_01;
    
        public String getKey() {
            return key;
        }
    
        public void setKey(String key) {
            this.key = key;
        }
    
        public String getInt_id() {
            return int_id;
        }
    
        public void setInt_id(String int_id) {
            this.int_id = int_id;
        }
    
        public String getStart_time() {
            return start_time;
        }
    
        public void setStart_time(String start_time) {
            this.start_time = start_time;
        }
    
        public long getMr_packetlossrateulqci_1_00() {
            return mr_packetlossrateulqci_1_00;
        }
    
        public void setMr_packetlossrateulqci_1_00(long mr_packetlossrateulqci_1_00) {
            this.mr_packetlossrateulqci_1_00 = mr_packetlossrateulqci_1_00;
        }
    
        public long getMr_packetlossrateulqci_1_01() {
            return mr_packetlossrateulqci_1_01;
        }
    
        public void setMr_packetlossrateulqci_1_01(long mr_packetlossrateulqci_1_01) {
            this.mr_packetlossrateulqci_1_01 = mr_packetlossrateulqci_1_01;
        }
    
        public long getMr_packetlossrateulqci_2_00() {
            return mr_packetlossrateulqci_2_00;
        }
    
        public void setMr_packetlossrateulqci_2_00(long mr_packetlossrateulqci_2_00) {
            this.mr_packetlossrateulqci_2_00 = mr_packetlossrateulqci_2_00;
        }
    
        public long getMr_packetlossrateulqci_2_01() {
            return mr_packetlossrateulqci_2_01;
        }
    
        public void setMr_packetlossrateulqci_2_01(long mr_packetlossrateulqci_2_01) {
            this.mr_packetlossrateulqci_2_01 = mr_packetlossrateulqci_2_01;
        }
    
        public long getMr_packetlossrateulqci_3_00() {
            return mr_packetlossrateulqci_3_00;
        }
    
        public void setMr_packetlossrateulqci_3_00(long mr_packetlossrateulqci_3_00) {
            this.mr_packetlossrateulqci_3_00 = mr_packetlossrateulqci_3_00;
        }
    
        public long getMr_packetlossrateulqci_3_01() {
            return mr_packetlossrateulqci_3_01;
        }
    
        public void setMr_packetlossrateulqci_3_01(long mr_packetlossrateulqci_3_01) {
            this.mr_packetlossrateulqci_3_01 = mr_packetlossrateulqci_3_01;
        }
    
        public long getMr_packetlossrateulqci_4_00() {
            return mr_packetlossrateulqci_4_00;
        }
    
        public void setMr_packetlossrateulqci_4_00(long mr_packetlossrateulqci_4_00) {
            this.mr_packetlossrateulqci_4_00 = mr_packetlossrateulqci_4_00;
        }
    
        public long getMr_packetlossrateulqci_4_01() {
            return mr_packetlossrateulqci_4_01;
        }
    
        public void setMr_packetlossrateulqci_4_01(long mr_packetlossrateulqci_4_01) {
            this.mr_packetlossrateulqci_4_01 = mr_packetlossrateulqci_4_01;
        }
    
        public long getMr_packetlossrateulqci_5_00() {
            return mr_packetlossrateulqci_5_00;
        }
    
        public void setMr_packetlossrateulqci_5_00(long mr_packetlossrateulqci_5_00) {
            this.mr_packetlossrateulqci_5_00 = mr_packetlossrateulqci_5_00;
        }
    
        public long getMr_packetlossrateulqci_5_01() {
            return mr_packetlossrateulqci_5_01;
        }
    
        public void setMr_packetlossrateulqci_5_01(long mr_packetlossrateulqci_5_01) {
            this.mr_packetlossrateulqci_5_01 = mr_packetlossrateulqci_5_01;
        }
    
        public long getMr_packetlossrateulqci_6_00() {
            return mr_packetlossrateulqci_6_00;
        }
    
        public void setMr_packetlossrateulqci_6_00(long mr_packetlossrateulqci_6_00) {
            this.mr_packetlossrateulqci_6_00 = mr_packetlossrateulqci_6_00;
        }
    
        public long getMr_packetlossrateulqci_6_01() {
            return mr_packetlossrateulqci_6_01;
        }
    
        public void setMr_packetlossrateulqci_6_01(long mr_packetlossrateulqci_6_01) {
            this.mr_packetlossrateulqci_6_01 = mr_packetlossrateulqci_6_01;
        }
    
        public long getMr_packetlossrateulqci_7_00() {
            return mr_packetlossrateulqci_7_00;
        }
    
        public void setMr_packetlossrateulqci_7_00(long mr_packetlossrateulqci_7_00) {
            this.mr_packetlossrateulqci_7_00 = mr_packetlossrateulqci_7_00;
        }
    
        public long getMr_packetlossrateulqci_7_01() {
            return mr_packetlossrateulqci_7_01;
        }
    
        public void setMr_packetlossrateulqci_7_01(long mr_packetlossrateulqci_7_01) {
            this.mr_packetlossrateulqci_7_01 = mr_packetlossrateulqci_7_01;
        }
    
        public long getMr_packetlossrateulqci_8_00() {
            return mr_packetlossrateulqci_8_00;
        }
    
        public void setMr_packetlossrateulqci_8_00(long mr_packetlossrateulqci_8_00) {
            this.mr_packetlossrateulqci_8_00 = mr_packetlossrateulqci_8_00;
        }
    
        public long getMr_packetlossrateulqci_8_01() {
            return mr_packetlossrateulqci_8_01;
        }
    
        public void setMr_packetlossrateulqci_8_01(long mr_packetlossrateulqci_8_01) {
            this.mr_packetlossrateulqci_8_01 = mr_packetlossrateulqci_8_01;
        }
    
        public long getMr_packetlossrateulqci_9_00() {
            return mr_packetlossrateulqci_9_00;
        }
    
        public void setMr_packetlossrateulqci_9_00(long mr_packetlossrateulqci_9_00) {
            this.mr_packetlossrateulqci_9_00 = mr_packetlossrateulqci_9_00;
        }
    
        public long getMr_packetlossrateulqci_9_01() {
            return mr_packetlossrateulqci_9_01;
        }
    
        public void setMr_packetlossrateulqci_9_01(long mr_packetlossrateulqci_9_01) {
            this.mr_packetlossrateulqci_9_01 = mr_packetlossrateulqci_9_01;
        }
    }
    View Code

    从topic上获取数据流:

    Dataset<Row> dsParsed = this.sparkSession.readStream().format("kafka").options(this.kafkaOptions).option("subscribe", topicName)
                    .option("startingOffsets", "earliest").load();
    
    String waterMarkName = "query" + this.getTopicEncodeName(topicName) + "Agg";
    int windowDuration = 2 * 60;
    int slideDuration = 60;
    
    try {
        dsParsed.withWatermark("timestamp", "2 hour").createTempView(waterMarkName);
    } catch (AnalysisException e1) {
        e1.printStackTrace();
        throw new RuntimeException(e1);
    }
    
    String aggSQL = "xxx";
    Dataset<Row> dsSQL1 = sparkSession.sql(aggSQL);
    dsSQL1.printSchema();

    对获取的数据流按照key进行数据拼接:

    正确的处理方式:按照key对数据进行分组,然后对同一组数据按照key进行排序,之后完成数据合并,把合并结果打印到console上。

    KeyValueGroupedDataset<String, Row> tuple2Dataset = dsSQL1.groupByKey((MapFunction<Row, String>) row -> {
        String int_id = row.getAs("int_id");
        String start_time = row.getAs("start_time");
        String key = int_id + "_" + start_time;
        return key;
    }, Encoders.STRING());
    
    Dataset<MS_PLRULQX_Combine> tuple2FlatMapDataset = tuple2Dataset.flatMapGroups(
            new FlatMapGroupsFunction<String, Row, MS_PLRULQX_Combine>() {
                private static final long serialVersionUID = 1400167811199763836L;
    
                @Override
                public Iterator<MS_PLRULQX_Combine> call(String key, Iterator<Row> values) throws Exception {
                    List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>();
    
                    while (values.hasNext()) {
                        Row value = values.next();
                        MS_PLRULQX item = new MS_PLRULQX(value);
                        list.add(item);
                    }
    
                    Collections.sort(list, (v1, v2) -> -(v2.compareTo(v1)));
    
                    return Arrays.asList(new MS_PLRULQX_Combine(list)).iterator();
                }
            }, Encoders.bean(MS_PLRULQX_Combine.class));
    
    Dataset<Row> rows = tuple2FlatMapDataset.toDF();
    rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();

    对获取的数据流按照key进行数据拼接,另外一种方案遇到的问题:

    该方案使用JavaRDD进行分组,排序,合并。

    import scala.Tuple2;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    
    JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {
        private static final long serialVersionUID = -5203498264050492910L;
    
        @Override
        public Tuple2<String, MS_PLRULQX> call(Row row) throws Exception {
            MS_PLRULQX value = new MS_PLRULQX(row);
    
            return new Tuple2<String, MS_PLRULQX>(value.getPrimaryKey(), value);
        }
    });
    
    JavaPairRDD<String, Iterable<MS_PLRULQX>> group = pairs.groupByKey();
    
    JavaPairRDD<String, MS_PLRULQX_Combine> keyVsValuePairRDD = group.mapToPair(tuple -> {
        List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>();
        Iterator<MS_PLRULQX> it = tuple._2.iterator();
        while (it.hasNext()) {
            MS_PLRULQX score = it.next();
            list.add(score);
        }
    
        Collections.sort(list, (v1, v2) -> -(v2.compareTo(v1)));
    
        return new Tuple2<String, MS_PLRULQX_Combine>(tuple._1, new MS_PLRULQX_Combine(list));
    });
    
    JavaRDD<MS_PLRULQX_Combine> javaRDD = keyVsValuePairRDD
            .map(new Function<Tuple2<String, MS_PLRULQX_Combine>, MS_PLRULQX_Combine>() {
                private static final long serialVersionUID = -3031600976005716506L;
    
                @Override
                public MS_PLRULQX_Combine call(Tuple2<String, MS_PLRULQX_Combine> v1) throws Exception {
                    return v1._2;
                }
            });
    
    Dataset<Row> rows = this.sparkSession.createDataFrame(javaRDD, MS_PLRULQX_Combine.class);
    rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();
    sparkSession.streams().awaitAnyTermination();

     抛出错误的位置就是:

    Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
    at com.xx.xx.streaming.drivers.XXXDriver.run(xxxxDriver.java:85) 错误代码执行“JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {”该行。
    该错误代码,看起来像是“执行了.toJavaRDD()和执行dsSQL1.show/dsSQL1.collection.foreach(println(_))一样。”

    从spark官网上(spark2.3.1)中只提到了structured spark支持Dataset/DataFrame API,并未提到支持RDD:

  • 相关阅读:
    Runloop 新的看法
    如何利用openCV做灰度图片
    WebViewJavascriptBridge使用说明(iOS)
    页面滑动返回和点击返回按钮动作实现;
    获取设备UDID、IMEI、ICCID、序列号、Mac地址等信息
    设计模式----单例模式
    多线程理论知识 -- 小白的教程
    SQLite 的创建与编辑
    strong,weak, retain, assign的区别
    CGContextRef 画线简单用法
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9551741.html
Copyright © 2011-2022 走看看