zoukankan      html  css  js  c++  java
  • Flink 读取用户自定义source

    package com.kpwong.aiptest
    
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    
    import scala.collection.immutable
    import scala.util.Random
    
    object MySourceTest {
    
      def main(args: Array[String]): Unit = {
    
         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
         val myDS: DataStream[SensorReading] = env.addSource(new MySensorSource)
         myDS.print()
         env.execute()
      }
    
    }
    
    //自定义SourceFunction
    
    class  MySensorSource() extends  SourceFunction[SensorReading]{
      
      //定义一个flag 用来标记数据源是否正常发出数据
    
      var running = true
    
      System.currentTimeMillis()
    
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //定义一个随机数发生器
         val random: Random = new Random()
         val init: immutable.IndexedSeq[(String, Long, Double)] = (1 to 10).map(i=>("Sensor"+i,System.currentTimeMillis(),random.nextDouble()))
         while (running)
           {
             Thread.sleep(1000)
             //在上次数据基础上微调,更新温度值
             val current: immutable.IndexedSeq[(String, Long, Double)] = init.map(data=>(data._1,System.currentTimeMillis(),data._3+random.nextGaussian()))
    
             val readings: immutable.IndexedSeq[SensorReading] = current.map(data=>SensorReading(data._1,data._2,data._3))
    
             readings.map(data=>sourceContext.collect(data))
    //         readings.foreach(data=>sourceContext.collect(data))
           }
        
      }
    
      override def cancel(): Unit = {
        running = false
      }
    }

    运行结果:

  • 相关阅读:
    认知实习(杭钢、杭州中萃)
    Head of a Gang (map+邻接表+DFS)
    Sharing
    Hello World for U (20)
    计算器(delphi)
    CentOS 下 Codeblocks 的 安装 + 汉化 以及 基本使用介绍
    关于阿里如何吸引大学生用户理财的一些个人看法
    分页 存储过程
    点击退出,并未直接跳转到登陆界面,登录界面还停留在框架集界面
    梦幻之旅--地图编辑器开发
  • 原文地址:https://www.cnblogs.com/kpwong/p/14089985.html
Copyright © 2011-2022 走看看