zoukankan      html  css  js  c++  java
  • Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows窗口是否可以实现最近一小时统计

    WaterMark除了可以限定来迟数据范围,是否可以实现最近一小时统计?

    WaterMark目的用来限定参数计算数据的范围:比如当前计算数据内max timestamp是12::00,waterMark限定数据分为是60 minutes,那么如果此时输入11:00之前的数据就会被舍弃不参与统计,视为来迟范围超出了60minutes限定范围。

    那么,是否可以借助它实现最近一小时的数据统计呢?

    代码示例:

    package com.dx.streaming
    
    import java.sql.Timestamp
    import java.text.SimpleDateFormat
    
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{Encoders, SparkSession}
    import org.apache.log4j.{Level, Logger}
    
    case class MyEntity(id: String, timestamp: Timestamp, value: Integer)
    
    object Main {
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
      Logger.getLogger("akka").setLevel(Level.ERROR);
      Logger.getLogger("kafka").setLevel(Level.ERROR);
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        val lines = spark.readStream.format("socket").option("host", "192.168.0.141").option("port", 19999).load()
    
        var sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        import spark.implicits._
        lines.as(Encoders.STRING)
          .map(row => {
            val fields = row.split(",")
            MyEntity(fields(0), new Timestamp(sdf.parse(fields(1)).getTime), Integer.valueOf(fields(2)))
          })
          .createOrReplaceTempView("tv_entity")
    
        spark.sql("select id,timestamp,value from tv_entity")
          .withWatermark("timestamp", "60 minutes")
          .createOrReplaceTempView("tv_entity_watermark")
    
        val resultDf = spark.sql(
          s"""
             |select id,sum(value) as sum_value
             |from  tv_entity_watermark
             |group id
             |""".stripMargin)
    
        val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()
    
        query.awaitTermination()
        query.stop()
      }
    }

    当通过nc -lk 19999中依次(每组输入间隔几秒时间即可)输入如下数据时:

    1,2018-12-01 12:00:01,100
    2,2018-12-01 12:00:01,100
    
    1,2018-12-01 12:05:01,100
    2,2018-12-01 12:05:01,100
    
    1,2018-12-01 12:15:01,100
    2,2018-12-01 12:15:01,100
    
    1,2018-12-01 12:25:01,100
    2,2018-12-01 12:25:01,100
    
    1,2018-12-01 12:35:01,100
    2,2018-12-01 12:35:01,100
    
    1,2018-12-01 12:45:01,100
    2,2018-12-01 12:45:01,100
    
    1,2018-12-01 12:55:01,100
    2,2018-12-01 12:55:01,100
    
    1,2018-12-01 13:05:02,100
    2,2018-12-01 13:05:02,100
    
    1,2018-12-01 13:15:01,100
    2,2018-12-01 13:15:01,100

    发现最终统计结果为:

    id  , sum_value
    1   ,  900
    2   ,  900

    而不是期望的

    id  , sum_value
    1   ,  600
    2   ,  600

    既然是不能限定数据统计范围是60minutes,是否需要借助于窗口函数window就可以实现呢?

    是否需要借助于watermark和窗口函数window就可以实现最近1小时数据统计呢?

        spark.sql("select id,timestamp,value from tv_entity")
          .withWatermark("timestamp", "60 minutes")
          .createOrReplaceTempView("tv_entity_watermark")
    
        val resultDf = spark.sql(
          s"""
             |select id,sum(value) as sum_value
             |from  tv_entity_watermark
             |group window(timestamp,'60 minutes','60 minutes'),id
             |""".stripMargin)
    
        val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()

    依然输入上边的测试数据,会发现超过1小时候数据会重新开辟(归零后重新统计)一个统计结果,而不是滚动的一小时统计。

    就是把上边的测试数据分为了两组来分别统计:

    第一组(小时)参与统计数据:

    1,2018-12-01 12:00:01,100
    2,2018-12-01 12:00:01,100
    
    1,2018-12-01 12:05:01,100
    2,2018-12-01 12:05:01,100
    
    1,2018-12-01 12:15:01,100
    2,2018-12-01 12:15:01,100
    
    1,2018-12-01 12:25:01,100
    2,2018-12-01 12:25:01,100
    
    1,2018-12-01 12:35:01,100
    2,2018-12-01 12:35:01,100
    
    1,2018-12-01 12:45:01,100
    2,2018-12-01 12:45:01,100
    
    1,2018-12-01 12:55:01,100
    2,2018-12-01 12:55:01,100

    第二组(小时)参与统计数据:

    1,2018-12-01 13:05:02,100
    2,2018-12-01 13:05:02,100
    
    1,2018-12-01 13:15:01,100
    2,2018-12-01 13:15:01,100

    猜测总结:

    根据上边测试结果可以推出一个猜测结论:

    在spark structured streaming中是不存储参数统计的数据的,只是对数据进行了maxTimestamp.avgTimestamp,minTimestamp存储,同时只是对数据的统计结果进行存储,下次再次触发统计时只是在原有的统计结果之上进行累加等操作,而参与统计的数据应该是没有存储,否则这类需求应该是可以实现。

    但是以下代码尝试确实是可以实现,缺点太耗费资源:

     1 package com.dx.streaming
     2 
     3 import java.sql.Timestamp
     4 import java.text.SimpleDateFormat
     5 
     6 import org.apache.spark.sql.streaming.OutputMode
     7 import org.apache.spark.sql.{Encoders, SparkSession}
     8 import org.apache.log4j.{Level, Logger}
     9 
    10 case class MyEntity(id: String, timestamp: Timestamp, value: Integer)
    11 
    12 object Main {
    13   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    14   Logger.getLogger("akka").setLevel(Level.ERROR)
    15   Logger.getLogger("kafka").setLevel(Level.ERROR)
    16 
    17   def main(args: Array[String]): Unit = {
    18     val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    19     val lines = spark.readStream.format("socket").option("host", "192.168.0.141").option("port", 19999).load()
    20 
    21     var sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    22     import spark.implicits._
    23     lines.as(Encoders.STRING)
    24       .map(row => {
    25         val fields = row.split(",")
    26         MyEntity(fields(0), new Timestamp(sdf.parse(fields(1)).getTime), Integer.valueOf(fields(2)))
    27       })
    28       .createOrReplaceTempView("tv_entity")
    29 
    30     spark.sql("select id,timestamp,value from tv_entity")
    31       .withWatermark("timestamp", "60 minutes")
    32       .createOrReplaceTempView("tv_entity_watermark")
    33 
    34     var resultDf =  spark.sql(
    35       s"""
    36          |select id,min(timestamp) min_timestamp,max(timestamp) max_timestamp,sum(value) as sum_value
    37          |from tv_entity_watermark
    38          |group by window(timestamp,'3600 seconds','60 seconds'),id
    39          |""".stripMargin)
    40 
    41     val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()
    42 
    43     query.awaitTermination()
    44     query.stop()
    45   }
    46 }
    View Code

    使用spark streaming把历史结果保存到内存中实现最近一小时统计:

    pom.xml

            <!--Spark -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>

    java code:

    package com.dx.streaming;
    
    import java.io.Serializable;
    import java.sql.Timestamp;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.log4j.Level;
    import org.apache.log4j.LogManager;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    public class Main {
        private static List<MyEntity> store = new ArrayList<MyEntity>();
        private static JavaStreamingContext jssc;
    
        public static void main(String[] args) throws Exception {
            // set log4j programmatically
            LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
            LogManager.getLogger("akka").setLevel(Level.ERROR);
            LogManager.getLogger("kafka").setLevel(Level.ERROR);
    
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // 3600000
            System.out.println(sdf.parse("2018-12-04 11:00:00").getTime() - sdf.parse("2018-12-04 10:00:00").getTime());
    
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // jssc = new JavaStreamingContext(conf, Durations.seconds(10));
            jssc = new JavaStreamingContext(sc, Durations.seconds(10));
    
            JavaReceiverInputDStream<String> lines = jssc.socketTextStream("192.168.0.141", 19999);
    
            JavaDStream<MyEntity> dStream = lines.map(new Function<String, MyEntity>() {
                private static final long serialVersionUID = 1L;
    
                public MyEntity call(String line) throws Exception {
                    String[] fields = line.split(",");
                    MyEntity myEntity = new MyEntity();
                    myEntity.setId(Integer.valueOf(fields[0]));
                    myEntity.setTimestamp(Timestamp.valueOf(fields[1]));
                    myEntity.setValue(Long.valueOf(fields[2]));
                    return myEntity;
                }
            });
    
            // 不确定是否必须repartition(1),目的避免外边这层循环多次循环,确保只执行一次大循环。
            dStream.repartition(1).foreachRDD(new VoidFunction<JavaRDD<MyEntity>>() {
                public void call(JavaRDD<MyEntity> tItems) throws Exception {
                    System.out.println("print...");
                    tItems.foreach(new VoidFunction<MyEntity>() {
                        public void call(MyEntity t) throws Exception {
                            System.out.println(">>>>>>>>>>>>>" + t.toString());
                            store.add(t);
                            System.out.println(store.size());
                        }
                    });
    
                    System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
                    for (MyEntity myEntity : store) {
                        System.out.println("++++++++++++++++++++++" + myEntity.toString());
                    }
    
                    // 第一步:从store中超過1小時之前的數據剔除;
                    MyEntity first = store.get(0);
                    MyEntity last = store.get(store.size() - 1);
                    // 超過一小時(这里为什么这么做,假设数据本身就是按照时间循序有序插入的,实际业务中如果相同可以这样做)
                    while (last.getTimestamp().getTime() - first.getTimestamp().getTime() > 3600000) {
                        store.remove(0);
                        first = store.get(0);
                    }
    
                    // 第二步:執行業務統計代碼
                    Map<Integer, Long> statistics = new HashMap<Integer, Long>();
                    for (MyEntity myEntity : store) {
                        if (false == statistics.containsKey(myEntity.getId())) {
                            statistics.put(myEntity.getId(), myEntity.getValue());
                        } else {
                            statistics.put(myEntity.getId(), myEntity.getValue() + statistics.get(myEntity.getId()));
                        }
                    }
    
                    // 第三步:将结果写入关系数据库
                    System.out.println("#######################print result##########################");
                    for (Map.Entry<Integer, Long> kv : statistics.entrySet()) {
                        System.out.println(kv.getKey() + "," + kv.getValue());
                    }
                }
            });
    
            jssc.start(); // Start the computation
            jssc.awaitTermination(); // Wait for the computation to terminate
        }
    }
    
    class MyEntity implements Serializable {
        private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        private int id;
        private Timestamp timestamp;
        private long value;
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public Timestamp getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(Timestamp timestamp) {
            this.timestamp = timestamp;
        }
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return getId() + "," + sdf.format(new Date(getTimestamp().getTime())) + "," + getValue();
        }
    }

    输出日志

    18/12/04 14:45:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@62d73ead{/streaming/batch,null,AVAILABLE,@Spark}
    18/12/04 14:45:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@228cea97{/streaming/batch/json,null,AVAILABLE,@Spark}
    18/12/04 14:45:56 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3db663d0{/static/streaming,null,AVAILABLE,@Spark}
    18/12/04 14:45:57 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:45:57 WARN storage.BlockManager: Block input-0-1543905957600 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>1,2018-12-01 12:00:01,100
    1
    >>>>>>>>>>>>>2,2018-12-01 12:00:01,100
    2
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    #######################print result##########################
    1,100
    2,100
    18/12/04 14:46:06 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:06 WARN storage.BlockManager: Block input-0-1543905966400 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:46:07 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:07 WARN storage.BlockManager: Block input-0-1543905967200 replicated to only 0 peer(s) instead of 1 peers
    print...
    [Stage 3:>                                                          (0 + 0) / 2]>>>>>>>>>>>>>1,2018-12-01 12:05:01,100
    3
    >>>>>>>>>>>>>2,2018-12-01 12:05:01,100
    4
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@                
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    #######################print result##########################
    1,200
    2,200
    18/12/04 14:46:18 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:18 WARN storage.BlockManager: Block input-0-1543905977800 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:46:18 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:18 WARN storage.BlockManager: Block input-0-1543905978400 replicated to only 0 peer(s) instead of 1 peers
    print...
    [Stage 5:>                                                          (0 + 0) / 2]>>>>>>>>>>>>>1,2018-12-01 12:15:01,100
    5
    >>>>>>>>>>>>>2,2018-12-01 12:15:01,100
    6
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@                
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    #######################print result##########################
    1,300
    2,300
    18/12/04 14:46:29 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:29 WARN storage.BlockManager: Block input-0-1543905989200 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:46:30 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:30 WARN storage.BlockManager: Block input-0-1543905989800 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>1,2018-12-01 12:25:01,100
    7
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    #######################print result##########################
    1,400
    2,300
    18/12/04 14:46:35 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:35 WARN storage.BlockManager: Block input-0-1543905995000 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:46:35 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:35 WARN storage.BlockManager: Block input-0-1543905995600 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>2,2018-12-01 12:25:01,100
    8
    >>>>>>>>>>>>>1,2018-12-01 12:35:01,100
    9
    >>>>>>>>>>>>>2,2018-12-01 12:35:01,100
    10
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    ++++++++++++++++++++++2,2018-12-01 12:25:01,100
    ++++++++++++++++++++++1,2018-12-01 12:35:01,100
    ++++++++++++++++++++++2,2018-12-01 12:35:01,100
    #######################print result##########################
    1,500
    2,500
    18/12/04 14:46:46 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:46 WARN storage.BlockManager: Block input-0-1543906006000 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:46:47 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:47 WARN storage.BlockManager: Block input-0-1543906006800 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>1,2018-12-01 12:45:01,100
    11
    >>>>>>>>>>>>>2,2018-12-01 12:45:01,100
    12
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    ++++++++++++++++++++++2,2018-12-01 12:25:01,100
    ++++++++++++++++++++++1,2018-12-01 12:35:01,100
    ++++++++++++++++++++++2,2018-12-01 12:35:01,100
    ++++++++++++++++++++++1,2018-12-01 12:45:01,100
    ++++++++++++++++++++++2,2018-12-01 12:45:01,100
    #######################print result##########################
    1,600
    2,600
    18/12/04 14:46:52 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:52 WARN storage.BlockManager: Block input-0-1543906011800 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:46:53 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:46:53 WARN storage.BlockManager: Block input-0-1543906013200 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>1,2018-12-01 12:55:01,100
    13
    >>>>>>>>>>>>>2,2018-12-01 12:55:01,100
    14
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    ++++++++++++++++++++++2,2018-12-01 12:25:01,100
    ++++++++++++++++++++++1,2018-12-01 12:35:01,100
    ++++++++++++++++++++++2,2018-12-01 12:35:01,100
    ++++++++++++++++++++++1,2018-12-01 12:45:01,100
    ++++++++++++++++++++++2,2018-12-01 12:45:01,100
    ++++++++++++++++++++++1,2018-12-01 12:55:01,100
    ++++++++++++++++++++++2,2018-12-01 12:55:01,100
    #######################print result##########################
    1,700
    2,700
    18/12/04 14:47:04 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:47:04 WARN storage.BlockManager: Block input-0-1543906024600 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:47:06 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:47:06 WARN storage.BlockManager: Block input-0-1543906026000 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>1,2018-12-01 13:05:02,100
    15
    >>>>>>>>>>>>>2,2018-12-01 13:05:02,100
    16
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:00:01,100
    ++++++++++++++++++++++2,2018-12-01 12:00:01,100
    ++++++++++++++++++++++1,2018-12-01 12:05:01,100
    ++++++++++++++++++++++2,2018-12-01 12:05:01,100
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    ++++++++++++++++++++++2,2018-12-01 12:25:01,100
    ++++++++++++++++++++++1,2018-12-01 12:35:01,100
    ++++++++++++++++++++++2,2018-12-01 12:35:01,100
    ++++++++++++++++++++++1,2018-12-01 12:45:01,100
    ++++++++++++++++++++++2,2018-12-01 12:45:01,100
    ++++++++++++++++++++++1,2018-12-01 12:55:01,100
    ++++++++++++++++++++++2,2018-12-01 12:55:01,100
    ++++++++++++++++++++++1,2018-12-01 13:05:02,100
    ++++++++++++++++++++++2,2018-12-01 13:05:02,100
    #######################print result##########################
    1,600
    2,600
    18/12/04 14:47:18 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:47:18 WARN storage.BlockManager: Block input-0-1543906038000 replicated to only 0 peer(s) instead of 1 peers
    18/12/04 14:47:19 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    18/12/04 14:47:19 WARN storage.BlockManager: Block input-0-1543906038800 replicated to only 0 peer(s) instead of 1 peers
    print...
    >>>>>>>>>>>>>1,2018-12-01 13:15:01,100
    13
    >>>>>>>>>>>>>2,2018-12-01 13:15:01,100
    14
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    ++++++++++++++++++++++2,2018-12-01 12:25:01,100
    ++++++++++++++++++++++1,2018-12-01 12:35:01,100
    ++++++++++++++++++++++2,2018-12-01 12:35:01,100
    ++++++++++++++++++++++1,2018-12-01 12:45:01,100
    ++++++++++++++++++++++2,2018-12-01 12:45:01,100
    ++++++++++++++++++++++1,2018-12-01 12:55:01,100
    ++++++++++++++++++++++2,2018-12-01 12:55:01,100
    ++++++++++++++++++++++1,2018-12-01 13:05:02,100
    ++++++++++++++++++++++2,2018-12-01 13:05:02,100
    ++++++++++++++++++++++1,2018-12-01 13:15:01,100
    ++++++++++++++++++++++2,2018-12-01 13:15:01,100
    #######################print result##########################
    1,700
    2,700
    print...
    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    ++++++++++++++++++++++1,2018-12-01 12:15:01,100
    ++++++++++++++++++++++2,2018-12-01 12:15:01,100
    ++++++++++++++++++++++1,2018-12-01 12:25:01,100
    ++++++++++++++++++++++2,2018-12-01 12:25:01,100
    ++++++++++++++++++++++1,2018-12-01 12:35:01,100
    ++++++++++++++++++++++2,2018-12-01 12:35:01,100
    ++++++++++++++++++++++1,2018-12-01 12:45:01,100
    ++++++++++++++++++++++2,2018-12-01 12:45:01,100
    ++++++++++++++++++++++1,2018-12-01 12:55:01,100
    ++++++++++++++++++++++2,2018-12-01 12:55:01,100
    ++++++++++++++++++++++1,2018-12-01 13:05:02,100
    ++++++++++++++++++++++2,2018-12-01 13:05:02,100
    ++++++++++++++++++++++1,2018-12-01 13:15:01,100
    ++++++++++++++++++++++2,2018-12-01 13:15:01,100
    #######################print result##########################
    1,700
    2,700
  • 相关阅读:
    力扣(LeetCode)605. 种花问题
    力扣(LeetCode)463. 岛屿的周长
    力扣(LeetCode)561. 数组拆分 I
    力扣(LeetCode) 263. 丑数
    区块链历史
    力扣(LeetCode) 821. 字符的最短距离
    力扣(LeetCode)804. 唯一摩尔斯密码词
    cmd 查看端口
    nginx windows版 下载和启动
    luogu P1270 “访问”美术馆
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/10054694.html
Copyright © 2011-2022 走看看