zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。

    需求:

    目前kafka的topic上有一批数据,这些数据被分配到9个不同的partition中(就是发布时key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)这些数据的唯一键值:int_id+start_time,其中int_id和start_time是topic record中的记录。这9组数据按照唯一键值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.primarykey1)。

    伪代码:

    m组成字段包含:

    public class MS_PLRULQX {
        private String key;
        private String int_id;
        private String start_time;
        private long MS_PLRULQX_00;
        private long MS_PLRULQX_01;
        
        public String getPrimaryKey() {
          return this.int_id + "_" + this.scan_start_time;
        }
    }

    完整MS_PLRULQX类定义:

    import java.io.Serializable;
    import org.apache.spark.sql.Row;
    
    public class MS_PLRULQX implements Serializable, Comparable<MS_PLRULQX> {
        private static final long serialVersionUID = -2873721171908282946L;
    
        public MS_PLRULQX() {
        }
        
        public MS_PLRULQX(Row row) {
            this.key = row.getAs("key");
            this.int_id = row.getAs("int_id");
            this.start_time = row.getAs("start_time");
            this.MS_PLRULQX_00 = row.getAs("MS_PLRULQX_00");
            this.MS_PLRULQX_01 = row.getAs("MS_PLRULQX_01");
        }
    
        private String key;
        private String int_id;
        private String start_time;
        private long MS_PLRULQX_00;
        private long MS_PLRULQX_01;
        
        public String getKey() {
            return key;
        }
    
        public void setKey(String key) {
            this.key = key;
        }
    
        public String getInt_id() {
            return int_id;
        }
    
        public void setInt_id(String int_id) {
            this.int_id = int_id;
        }
    
        public String getStart_time() {
            return start_time;
        }
    
        public void setStart_time(String start_time) {
            this.start_time = start_time;
        }
    
        public long getMS_PLRULQX_00() {
            return MS_PLRULQX_00;
        }
    
        public void setMS_PLRULQX_00(long MS_PLRULQX_00) {
            this.MS_PLRULQX_00 = MS_PLRULQX_00;
        }
    
        public long getMS_PLRULQX_01() {
            return MS_PLRULQX_01;
        }
    
        public void setMS_PLRULQX_01(long MS_PLRULQX_01) {
            this.MS_PLRULQX_01 = MS_PLRULQX_01;
        }
    
        public String getPrimaryKey() {
            return this.int_id + "_" + this.scan_start_time;
        }
    
        @Override
        public int compareTo(MS_PLRULQX other) {
            // key format:MS_PLRULQX1,MS_PLRULQX2,..MS_PLRULQX9
            if (this.getKey().toLowerCase().indexOf("MS_PLRULQX".toLowerCase()) != -1) {
                NumberUtils numberUtils = new NumberUtils();
                String thisKeyStr = this.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), "");
                String otherKeyStr = other.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), "");
                if (numberUtils.isNumber(thisKeyStr)) {
                    int thisKeyValue = Integer.valueOf(thisKeyStr);
                    int otherKeyValue = Integer.valueOf(otherKeyStr);
                    if (thisKeyValue > otherKeyValue) {
                        return 1;
                    } else if (thisKeyStr == otherKeyStr) {
                        return 0;
                    } else {
                        return -1;
                    }
                }
            }
    
            return this.key.compareTo(other.key);
        }
    
    }
    View Code

    MS_PLRULQX在9个topic中各有一份,把它们拼接起来,拼接条件primarykey相同的数据才能一起拼接,拼接后保留实体字段如下:

    public class MS_PLRULQX_Combine implements Serializable {
        private String key;
        private String int_id;
        private String start_time;
    
        private long mr_packetlossrateulqci_1_00;
        private long mr_packetlossrateulqci_1_01;
    
        private long mr_packetlossrateulqci_2_00;
        private long mr_packetlossrateulqci_2_01;
    
        private long mr_packetlossrateulqci_3_00;
        private long mr_packetlossrateulqci_3_01;
    
        private long mr_packetlossrateulqci_4_00;
        private long mr_packetlossrateulqci_4_01;
    
        private long mr_packetlossrateulqci_5_00;
        private long mr_packetlossrateulqci_5_01;
    
        private long mr_packetlossrateulqci_6_00;
        private long mr_packetlossrateulqci_6_01;
    
        private long mr_packetlossrateulqci_7_00;
        private long mr_packetlossrateulqci_7_01;
    
        private long mr_packetlossrateulqci_8_00;
        private long mr_packetlossrateulqci_8_01;
        
        private long mr_packetlossrateulqci_9_00;
        private long mr_packetlossrateulqci_9_01;
    }

    完整MS_PLRULQX_Combine 类定义:

    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    
    public class MS_PLRULQX_Combine implements Serializable {
        private static final long serialVersionUID = -944128402186054489L;
    
        public MS_PLRULQX_Combine() {
        }
    
        public MS_PLRULQX_Combine(List<MS_PLRULQX> list) {
            int sizeOfList = list.size();
            if (sizeOfList > 9) {
                throw new RuntimeException("the measurement group items's length(" + list.size() + ") over than 9");
            }
    
            if (sizeOfList >= 1) {
                setItem1(list.get(0));
            }
            if (sizeOfList >= 2) {
                setItem2(list.get(1));
            }
            if (sizeOfList >= 3) {
                setItem3(list.get(2));
            }
            if (sizeOfList >= 4) {
                setItem4(list.get(3));
            }
            if (sizeOfList >= 5) {
                setItem5(list.get(4));
            }
            if (sizeOfList >= 6) {
                setItem6(list.get(5));
            }
            if (sizeOfList >= 7) {
                setItem7(list.get(6));
            }
            if (sizeOfList >= 8) {
                setItem8(list.get(7));
            }
            if (sizeOfList >= 9) {
                setItem9(list.get(8));
            }
        }
    
        private void setItem9(MS_PLRULQX item9) {
            if (item9 != null) {
                this.mr_packetlossrateulqci_9_00 = item9.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_9_01 = item9.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem8(MS_PLRULQX item8) {
            if (item8 != null) {
                this.mr_packetlossrateulqci_8_00 = item8.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_8_01 = item8.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem7(MS_PLRULQX item7) {
            if (item7 != null) {
                this.mr_packetlossrateulqci_7_00 = item7.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_7_01 = item7.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem6(MS_PLRULQX item6) {
            if (item6 != null) {
                this.mr_packetlossrateulqci_6_00 = item6.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_6_01 = item6.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem5(MS_PLRULQX item5) {
            if (item5 != null) {
                this.mr_packetlossrateulqci_5_00 = item5.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_5_01 = item5.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem4(MS_PLRULQX item4) {
            if (item4 != null) {
                this.mr_packetlossrateulqci_4_00 = item4.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_4_01 = item4.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem3(MS_PLRULQX item3) {
            if (item3 != null) {
                this.mr_packetlossrateulqci_3_00 = item3.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_3_01 = item3.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem2(MS_PLRULQX item2) {
            if (item2 != null) {
                this.mr_packetlossrateulqci_2_00 = item2.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_2_01 = item2.getMr_packetlossrateulqci_01();
            }
        }
    
        private void setItem1(MS_PLRULQX item1) {
            if (item1 != null) {
                this.key = item1.getKey();
                this.int_id = item1.getInt_id();
                this.start_time = item1.getStart_time();
    
                this.mr_packetlossrateulqci_1_00 = item1.getMr_packetlossrateulqci_00();
                this.mr_packetlossrateulqci_1_01 = item1.getMr_packetlossrateulqci_01();
            }
        }
    
        private String key;
        private String int_id;
        private String start_time;
    
        private long mr_packetlossrateulqci_1_00;
        private long mr_packetlossrateulqci_1_01;
    
        private long mr_packetlossrateulqci_2_00;
        private long mr_packetlossrateulqci_2_01;
    
        private long mr_packetlossrateulqci_3_00;
        private long mr_packetlossrateulqci_3_01;
    
        private long mr_packetlossrateulqci_4_00;
        private long mr_packetlossrateulqci_4_01;
    
        private long mr_packetlossrateulqci_5_00;
        private long mr_packetlossrateulqci_5_01;
    
        private long mr_packetlossrateulqci_6_00;
        private long mr_packetlossrateulqci_6_01;
    
        private long mr_packetlossrateulqci_7_00;
        private long mr_packetlossrateulqci_7_01;
    
        private long mr_packetlossrateulqci_8_00;
        private long mr_packetlossrateulqci_8_01;
        
        private long mr_packetlossrateulqci_9_00;
        private long mr_packetlossrateulqci_9_01;
    
        public String getKey() {
            return key;
        }
    
        public void setKey(String key) {
            this.key = key;
        }
    
        public String getInt_id() {
            return int_id;
        }
    
        public void setInt_id(String int_id) {
            this.int_id = int_id;
        }
    
        public String getStart_time() {
            return start_time;
        }
    
        public void setStart_time(String start_time) {
            this.start_time = start_time;
        }
    
        public long getMr_packetlossrateulqci_1_00() {
            return mr_packetlossrateulqci_1_00;
        }
    
        public void setMr_packetlossrateulqci_1_00(long mr_packetlossrateulqci_1_00) {
            this.mr_packetlossrateulqci_1_00 = mr_packetlossrateulqci_1_00;
        }
    
        public long getMr_packetlossrateulqci_1_01() {
            return mr_packetlossrateulqci_1_01;
        }
    
        public void setMr_packetlossrateulqci_1_01(long mr_packetlossrateulqci_1_01) {
            this.mr_packetlossrateulqci_1_01 = mr_packetlossrateulqci_1_01;
        }
    
        public long getMr_packetlossrateulqci_2_00() {
            return mr_packetlossrateulqci_2_00;
        }
    
        public void setMr_packetlossrateulqci_2_00(long mr_packetlossrateulqci_2_00) {
            this.mr_packetlossrateulqci_2_00 = mr_packetlossrateulqci_2_00;
        }
    
        public long getMr_packetlossrateulqci_2_01() {
            return mr_packetlossrateulqci_2_01;
        }
    
        public void setMr_packetlossrateulqci_2_01(long mr_packetlossrateulqci_2_01) {
            this.mr_packetlossrateulqci_2_01 = mr_packetlossrateulqci_2_01;
        }
    
        public long getMr_packetlossrateulqci_3_00() {
            return mr_packetlossrateulqci_3_00;
        }
    
        public void setMr_packetlossrateulqci_3_00(long mr_packetlossrateulqci_3_00) {
            this.mr_packetlossrateulqci_3_00 = mr_packetlossrateulqci_3_00;
        }
    
        public long getMr_packetlossrateulqci_3_01() {
            return mr_packetlossrateulqci_3_01;
        }
    
        public void setMr_packetlossrateulqci_3_01(long mr_packetlossrateulqci_3_01) {
            this.mr_packetlossrateulqci_3_01 = mr_packetlossrateulqci_3_01;
        }
    
        public long getMr_packetlossrateulqci_4_00() {
            return mr_packetlossrateulqci_4_00;
        }
    
        public void setMr_packetlossrateulqci_4_00(long mr_packetlossrateulqci_4_00) {
            this.mr_packetlossrateulqci_4_00 = mr_packetlossrateulqci_4_00;
        }
    
        public long getMr_packetlossrateulqci_4_01() {
            return mr_packetlossrateulqci_4_01;
        }
    
        public void setMr_packetlossrateulqci_4_01(long mr_packetlossrateulqci_4_01) {
            this.mr_packetlossrateulqci_4_01 = mr_packetlossrateulqci_4_01;
        }
    
        public long getMr_packetlossrateulqci_5_00() {
            return mr_packetlossrateulqci_5_00;
        }
    
        public void setMr_packetlossrateulqci_5_00(long mr_packetlossrateulqci_5_00) {
            this.mr_packetlossrateulqci_5_00 = mr_packetlossrateulqci_5_00;
        }
    
        public long getMr_packetlossrateulqci_5_01() {
            return mr_packetlossrateulqci_5_01;
        }
    
        public void setMr_packetlossrateulqci_5_01(long mr_packetlossrateulqci_5_01) {
            this.mr_packetlossrateulqci_5_01 = mr_packetlossrateulqci_5_01;
        }
    
        public long getMr_packetlossrateulqci_6_00() {
            return mr_packetlossrateulqci_6_00;
        }
    
        public void setMr_packetlossrateulqci_6_00(long mr_packetlossrateulqci_6_00) {
            this.mr_packetlossrateulqci_6_00 = mr_packetlossrateulqci_6_00;
        }
    
        public long getMr_packetlossrateulqci_6_01() {
            return mr_packetlossrateulqci_6_01;
        }
    
        public void setMr_packetlossrateulqci_6_01(long mr_packetlossrateulqci_6_01) {
            this.mr_packetlossrateulqci_6_01 = mr_packetlossrateulqci_6_01;
        }
    
        public long getMr_packetlossrateulqci_7_00() {
            return mr_packetlossrateulqci_7_00;
        }
    
        public void setMr_packetlossrateulqci_7_00(long mr_packetlossrateulqci_7_00) {
            this.mr_packetlossrateulqci_7_00 = mr_packetlossrateulqci_7_00;
        }
    
        public long getMr_packetlossrateulqci_7_01() {
            return mr_packetlossrateulqci_7_01;
        }
    
        public void setMr_packetlossrateulqci_7_01(long mr_packetlossrateulqci_7_01) {
            this.mr_packetlossrateulqci_7_01 = mr_packetlossrateulqci_7_01;
        }
    
        public long getMr_packetlossrateulqci_8_00() {
            return mr_packetlossrateulqci_8_00;
        }
    
        public void setMr_packetlossrateulqci_8_00(long mr_packetlossrateulqci_8_00) {
            this.mr_packetlossrateulqci_8_00 = mr_packetlossrateulqci_8_00;
        }
    
        public long getMr_packetlossrateulqci_8_01() {
            return mr_packetlossrateulqci_8_01;
        }
    
        public void setMr_packetlossrateulqci_8_01(long mr_packetlossrateulqci_8_01) {
            this.mr_packetlossrateulqci_8_01 = mr_packetlossrateulqci_8_01;
        }
    
        public long getMr_packetlossrateulqci_9_00() {
            return mr_packetlossrateulqci_9_00;
        }
    
        public void setMr_packetlossrateulqci_9_00(long mr_packetlossrateulqci_9_00) {
            this.mr_packetlossrateulqci_9_00 = mr_packetlossrateulqci_9_00;
        }
    
        public long getMr_packetlossrateulqci_9_01() {
            return mr_packetlossrateulqci_9_01;
        }
    
        public void setMr_packetlossrateulqci_9_01(long mr_packetlossrateulqci_9_01) {
            this.mr_packetlossrateulqci_9_01 = mr_packetlossrateulqci_9_01;
        }
    }
    View Code

    从topic上获取数据流:

    Dataset<Row> dsParsed = this.sparkSession.readStream().format("kafka").options(this.kafkaOptions).option("subscribe", topicName)
                    .option("startingOffsets", "earliest").load();
    
    String waterMarkName = "query" + this.getTopicEncodeName(topicName) + "Agg";
    int windowDuration = 2 * 60;
    int slideDuration = 60;
    
    try {
        dsParsed.withWatermark("timestamp", "2 hour").createTempView(waterMarkName);
    } catch (AnalysisException e1) {
        e1.printStackTrace();
        throw new RuntimeException(e1);
    }
    
    String aggSQL = "xxx";
    Dataset<Row> dsSQL1 = sparkSession.sql(aggSQL);
    dsSQL1.printSchema();

    对获取的数据流按照key进行数据拼接:

    正确的处理方式:按照key对数据进行分组,然后对同一组数据按照key进行排序,之后完成数据合并,把合并结果打印到console上。

    KeyValueGroupedDataset<String, Row> tuple2Dataset = dsSQL1.groupByKey((MapFunction<Row, String>) row -> {
        String int_id = row.getAs("int_id");
        String start_time = row.getAs("start_time");
        String key = int_id + "_" + start_time;
        return key;
    }, Encoders.STRING());
    
    Dataset<MS_PLRULQX_Combine> tuple2FlatMapDataset = tuple2Dataset.flatMapGroups(
            new FlatMapGroupsFunction<String, Row, MS_PLRULQX_Combine>() {
                private static final long serialVersionUID = 1400167811199763836L;
    
                @Override
                public Iterator<MS_PLRULQX_Combine> call(String key, Iterator<Row> values) throws Exception {
                    List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>();
    
                    while (values.hasNext()) {
                        Row value = values.next();
                        MS_PLRULQX item = new MS_PLRULQX(value);
                        list.add(item);
                    }
    
                    Collections.sort(list, (v1, v2) -> -(v2.compareTo(v1)));
    
                    return Arrays.asList(new MS_PLRULQX_Combine(list)).iterator();
                }
            }, Encoders.bean(MS_PLRULQX_Combine.class));
    
    Dataset<Row> rows = tuple2FlatMapDataset.toDF();
    rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();

    对获取的数据流按照key进行数据拼接,另外一种方案遇到的问题:

    该方案使用JavaRDD进行分组,排序,合并。

    import scala.Tuple2;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    
    JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {
        private static final long serialVersionUID = -5203498264050492910L;
    
        @Override
        public Tuple2<String, MS_PLRULQX> call(Row row) throws Exception {
            MS_PLRULQX value = new MS_PLRULQX(row);
    
            return new Tuple2<String, MS_PLRULQX>(value.getPrimaryKey(), value);
        }
    });
    
    JavaPairRDD<String, Iterable<MS_PLRULQX>> group = pairs.groupByKey();
    
    JavaPairRDD<String, MS_PLRULQX_Combine> keyVsValuePairRDD = group.mapToPair(tuple -> {
        List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>();
        Iterator<MS_PLRULQX> it = tuple._2.iterator();
        while (it.hasNext()) {
            MS_PLRULQX score = it.next();
            list.add(score);
        }
    
        Collections.sort(list, (v1, v2) -> -(v2.compareTo(v1)));
    
        return new Tuple2<String, MS_PLRULQX_Combine>(tuple._1, new MS_PLRULQX_Combine(list));
    });
    
    JavaRDD<MS_PLRULQX_Combine> javaRDD = keyVsValuePairRDD
            .map(new Function<Tuple2<String, MS_PLRULQX_Combine>, MS_PLRULQX_Combine>() {
                private static final long serialVersionUID = -3031600976005716506L;
    
                @Override
                public MS_PLRULQX_Combine call(Tuple2<String, MS_PLRULQX_Combine> v1) throws Exception {
                    return v1._2;
                }
            });
    
    Dataset<Row> rows = this.sparkSession.createDataFrame(javaRDD, MS_PLRULQX_Combine.class);
    rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();
    sparkSession.streams().awaitAnyTermination();

     抛出错误的位置就是:

    Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
    at com.xx.xx.streaming.drivers.XXXDriver.run(xxxxDriver.java:85) 错误代码执行“JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {”该行。
    该错误代码,看起来像是“执行了.toJavaRDD()和执行dsSQL1.show/dsSQL1.collection.foreach(println(_))一样。”

    从spark官网上(spark2.3.1)中只提到了structured spark支持Dataset/DataFrame API,并未提到支持RDD:

  • 相关阅读:
    Azure PowerShell (2) 修改Azure订阅名称
    Windows Azure Platform Introduction (11) 了解Org ID、Windows Azure订阅、账户
    Azure PowerShell (3) 上传证书
    Azure PowerShell (1) PowerShell入门
    Windows Azure Service Bus (2) 队列(Queue)入门
    Windows Azure Service Bus (1) 基础
    Windows Azure Cloud Service (10) Role的生命周期
    Windows Azure Cloud Service (36) 在Azure Cloud Service配置SSL证书
    Android studio 使用心得(一)—android studio快速掌握快捷键
    android 签名、混淆打包
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9551741.html
Copyright © 2011-2022 走看看