zoukankan      html  css  js  c++  java
  • Flink 读取用户自定义source

    package com.kpwong.aiptest
    
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    
    import scala.collection.immutable
    import scala.util.Random
    
    object MySourceTest {
    
      def main(args: Array[String]): Unit = {
    
         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
         val myDS: DataStream[SensorReading] = env.addSource(new MySensorSource)
         myDS.print()
         env.execute()
      }
    
    }
    
    //自定义SourceFunction
    
    class  MySensorSource() extends  SourceFunction[SensorReading]{
      
      //定义一个flag 用来标记数据源是否正常发出数据
    
      var running = true
    
      System.currentTimeMillis()
    
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //定义一个随机数发生器
         val random: Random = new Random()
         val init: immutable.IndexedSeq[(String, Long, Double)] = (1 to 10).map(i=>("Sensor"+i,System.currentTimeMillis(),random.nextDouble()))
         while (running)
           {
             Thread.sleep(1000)
             //在上次数据基础上微调,更新温度值
             val current: immutable.IndexedSeq[(String, Long, Double)] = init.map(data=>(data._1,System.currentTimeMillis(),data._3+random.nextGaussian()))
    
             val readings: immutable.IndexedSeq[SensorReading] = current.map(data=>SensorReading(data._1,data._2,data._3))
    
             readings.map(data=>sourceContext.collect(data))
    //         readings.foreach(data=>sourceContext.collect(data))
           }
        
      }
    
      override def cancel(): Unit = {
        running = false
      }
    }

    运行结果:

  • 相关阅读:
    SpringBoot之旅第三篇-日志
    SpringBoot之旅第二篇-配置
    SpringBoot之旅第一篇-初探
    394. 字符串解码
    1190. 反转每对括号间的子串
    921. 使括号有效的最少添加
    Leetcode 1171. 从链表中删去总和值为零的连续节点
    设计模式之过滤器模式——Java语言描述
    MySQL查询执行的基础
    设计模式之桥接模式——Java语言描述
  • 原文地址:https://www.cnblogs.com/kpwong/p/14089985.html
Copyright © 2011-2022 走看看