zoukankan      html  css  js  c++  java
  • Flink 读取用户自定义source

    package com.kpwong.aiptest
    
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    
    import scala.collection.immutable
    import scala.util.Random
    
    object MySourceTest {
    
      def main(args: Array[String]): Unit = {
    
         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
         val myDS: DataStream[SensorReading] = env.addSource(new MySensorSource)
         myDS.print()
         env.execute()
      }
    
    }
    
    //自定义SourceFunction
    
    class  MySensorSource() extends  SourceFunction[SensorReading]{
      
      //定义一个flag 用来标记数据源是否正常发出数据
    
      var running = true
    
      System.currentTimeMillis()
    
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //定义一个随机数发生器
         val random: Random = new Random()
         val init: immutable.IndexedSeq[(String, Long, Double)] = (1 to 10).map(i=>("Sensor"+i,System.currentTimeMillis(),random.nextDouble()))
         while (running)
           {
             Thread.sleep(1000)
             //在上次数据基础上微调,更新温度值
             val current: immutable.IndexedSeq[(String, Long, Double)] = init.map(data=>(data._1,System.currentTimeMillis(),data._3+random.nextGaussian()))
    
             val readings: immutable.IndexedSeq[SensorReading] = current.map(data=>SensorReading(data._1,data._2,data._3))
    
             readings.map(data=>sourceContext.collect(data))
    //         readings.foreach(data=>sourceContext.collect(data))
           }
        
      }
    
      override def cancel(): Unit = {
        running = false
      }
    }

    运行结果:

  • 相关阅读:
    Python3连接MySQL
    jQuery
    Python之路--协程
    java面向对象练习题
    java基础语法4--封装,继承,多态
    java基础编程题
    java基础语法——方法,static关键字
    java基础语法3
    java基础语法2
    java基础语法1
  • 原文地址:https://www.cnblogs.com/kpwong/p/14089985.html
Copyright © 2011-2022 走看看