zoukankan      html  css  js  c++  java
  • Flink 读取用户自定义source

    package com.kpwong.aiptest
    
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    
    import scala.collection.immutable
    import scala.util.Random
    
    object MySourceTest {
    
      def main(args: Array[String]): Unit = {
    
         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
         val myDS: DataStream[SensorReading] = env.addSource(new MySensorSource)
         myDS.print()
         env.execute()
      }
    
    }
    
    //自定义SourceFunction
    
    class  MySensorSource() extends  SourceFunction[SensorReading]{
      
      //定义一个flag 用来标记数据源是否正常发出数据
    
      var running = true
    
      System.currentTimeMillis()
    
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //定义一个随机数发生器
         val random: Random = new Random()
         val init: immutable.IndexedSeq[(String, Long, Double)] = (1 to 10).map(i=>("Sensor"+i,System.currentTimeMillis(),random.nextDouble()))
         while (running)
           {
             Thread.sleep(1000)
             //在上次数据基础上微调,更新温度值
             val current: immutable.IndexedSeq[(String, Long, Double)] = init.map(data=>(data._1,System.currentTimeMillis(),data._3+random.nextGaussian()))
    
             val readings: immutable.IndexedSeq[SensorReading] = current.map(data=>SensorReading(data._1,data._2,data._3))
    
             readings.map(data=>sourceContext.collect(data))
    //         readings.foreach(data=>sourceContext.collect(data))
           }
        
      }
    
      override def cancel(): Unit = {
        running = false
      }
    }

    运行结果:

  • 相关阅读:
    常见jvm命令
    服务后台启动
    kafka创建topic,生产和消费指定topic消息
    kafka-manager安装
    修改ssh主机名
    设置虚拟机静态ip
    kafka术语
    cas和oauth2的区别
    会Python的大学生,步入职场将会非常抢手!
    python爬虫把url链接编码成gbk2312格式过程解析
  • 原文地址:https://www.cnblogs.com/kpwong/p/14089985.html
Copyright © 2011-2022 走看看