zoukankan      html  css  js  c++  java
  • FLINK实例(3): CONNECTORS(2)自定义source

    1 工程目录

    SensorReading

    package com.atguigu.flink.bean
    
    case class SensorReading(
                              id: String,
                              timestamp: Long,
                              timepreture: Double
                            )

    SensorSource

    package com.atguigu.flink.source
    
    import java.util.Calendar
    
    import com.atguigu.flink.bean.SensorReading
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, RichSourceFunction, SourceFunction}
    
    import scala.collection.immutable
    import scala.util.Random
    
    class SensorSource extends RichSourceFunction[SensorReading]{
      //表示数据源是否运行正常
      var running: Boolean = true
    
      //上下文参数来发送数据
      override def run(sContext:SourceFunction.SourceContext[SensorReading]) {
        val rand = new Random()
        //使用高斯噪声产生随机温度
        val curFtemp = (1 to 10).map(
          i => ("sensor_" + i, rand.nextGaussian() * 20)
        )
    
      //产生无限流数据
        while(running){
          val mapTemp:immutable.IndexedSeq[(String,Double)] = curFtemp.map(
            t => (t._1,t._2 + (rand.nextGaussian()*10))
          )
          //产生时间戳
          val curTime:Long = Calendar.getInstance().getTimeInMillis
    
          //发送出去
          mapTemp.foreach(t => sContext.collect(SensorReading(t._1,curTime,t._2)))
          //每隔100ms发送一条传感器数据
          Thread.sleep(100)
    
        }
    
      }
      override def cancel(): Unit = running =false
    }

    主程序入口 ConsumerFromSensorSource

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.source.SensorSource
    import org.apache.flink.streaming.api.datastream.DataStreamSource
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    
    object ConsumerFromSensorSource {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //调用addSource
        val stream: DataStreamSource[SensorReading] = env.addSource(new SensorSource)
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
    
    
    
      }
    
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13680148.html

  • 相关阅读:
    Java中RuntimeException和Exception的区别
    dubbo常见错误
    Spring3.x 版本和 JDK1.8 不兼容导致 java.lang.IllegalStateException: Failed to load ApplicationContext
    @Autowired与@Resource的区别
    阿里巴巴数据库连接池 druid配置详解
    HttpDns原理
    spring四种依赖注入方式
    JdbcTemplate
    springmvc常用注解标签详解
    访问spring接口一定要用.do么?
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13680148.html
Copyright © 2011-2022 走看看