zoukankan      html  css  js  c++  java
  • 记一次以socket作为源时不出结果的错误

    场景

    将Stu1(name:String,location:String,time:Long)按名字分组并转为Stu2(name:String,location_time:String)
    以fromElements获取源没问题

    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.{TimeCharacteristic, watermark}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable
    import scala.collection.mutable.ArrayBuffer
    
    object Test {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
    
    
          val ds: DataStream[Stu1] = env.fromElements(
          Stu1("aaa", "beijing", 100.0, 1603764498486L),
          Stu1("aaa", "hangzhou", 100.0, 1603764499486L),
          Stu1("bbb", "wuhan", 100.0, 1603764500486L),
          Stu1("bbb", "tianjing", 100.0, 1603764501486L),
          Stu1("ccc", "shanghai", 100.0, 1603764502486L),
          Stu1("eee", "changsha", 100.0, 1603764503486L),
          Stu1("eee", "yiyang", 100.0, 1603764504486L),
          Stu1("fff", "changde", 100.0, 1603764505486L),
          Stu1("fff", "haebing", 100.0, 1603764506486L),
          Stu1("fff", "wuchang", 100.0, 1603764507486L),
          Stu1("iii", "xian", 100.0, 1603764508486L),
          Stu1("jjj", "huaihua", 100.0, 1603764509486L),
          Stu1("kkk", "huarong", 100.0, 1603764510486L),
          Stu1("lll", "jinmeng", 100.0, 1603764511486L),
          Stu1("mmm", "huahehaote", 100.0, 1603764512486L),
          Stu1("mmm", "xinjiang", 100.0, 1603764513486L),
          Stu1("nn", "niemenggu", 100.0, 1603764514486L),
          Stu1("ll", "eluosi", 100.0, 1603764515486L),
          Stu1("kk", "meiguo", 100.0, 1603764516486L),
          Stu1("aaa", "yinbgguuo", 100.0, 1603764517486L),
          Stu1("aaa", "aaa", 100.0, 1603764518486L),
          Stu1("ccc", "bbbb", 100.0, 1603764519486L)
        )
            val data: DataStream[Stu1] = ds.assignTimestampsAndWatermarks(new MyPeriodicAssigner(1))
            data.print("data--->" )
            val value: DataStream[Stu2] = data.keyBy(_.name).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new MyProcessFunction())
            value.print("结果------->")
    
    
    
        env.execute()
      }
    }
    
    case class Stu1(name:String,location:String,money:Double,time:Long)
    case class Stu2(name:String,location_time:String)
    class MyProcessFunction extends  ProcessWindowFunction[Stu1, Stu2, String, TimeWindow] {
    
      override def process(key: String, context: Context, elements: Iterable[Stu1], out: Collector[Stu2]): Unit = {
        var map = mutable.Map[String, Long]()
          for (elem <- elements) {
              map += (elem.location -> elem.time)
          }
          println("key--->" + key + "	map的大小----->" + map.size)
          val stu = Stu2(key, map.toString())
        out.collect(stu)
      }
    }
    
    class MyPeriodicAssigner(maxLateless:Long) extends AssignerWithPeriodicWatermarks[Stu1]{
      var maxTimeStamp:Long = 0L
      override def getCurrentWatermark: Watermark = new watermark.Watermark(maxTimeStamp-maxLateless)
    
      override def extractTimestamp(element: Stu1, previousElementTimestamp: Long): Long = {
        val current_timestamp: Long = element.time
        maxTimeStamp = Math.max(current_timestamp,maxTimeStamp)
        current_timestamp
      }
    }
    

    以socketTextStream却不出结果,原因时没有指定setStreamTimeCharacteristic和并行度
    触发要基于这个事件时间,不指定并行度就会默认为8个(线程数),所以就没有数据输出

    
                object Test {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      import org.apache.flink.api.scala._
      //      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      //     env.setParallelism(1)
    
                 val ds: DataStream[String] = env.socketTextStream("test01", 9999)
                   val data: DataStream[Stu1] = ds.map(line => {
                   val array: Array[String] = line.split(",")
                   //println(array(0) + "	" + array(1) + "	" + array(2).toDouble + "	" + array(3).toLong)
                   Stu1(array(0), array(1), array(2).toDouble, array(3).toLong)
               }).assignTimestampsAndWatermarks(new MyPeriodicAssigner(1))
    
    
            data.print("data--->" )
            val value: DataStream[Stu2] = data.keyBy(_.name).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new MyProcessFunction())
            value.print("结果------->")
    
    
    
        env.execute()
      }
    }
    
  • 相关阅读:
    Vim配置及使用技巧
    终端提示符的配置
    Archlinux下i3wm与urxvt的配置
    Linux压缩命令
    Archlinux无线联网教程
    Archlinux安装和使用技巧
    Linux下硬盘分区
    Linux挂载
    Android中pullToRefresh使用
    SVN服务器搭建和使用教程
  • 原文地址:https://www.cnblogs.com/dch-21/p/13885580.html
Copyright © 2011-2022 走看看