zoukankan      html  css  js  c++  java
  • Flink之API的使用(3):Source的使用

    相关文章链接

    Flink之API的使用(1):Sink的使用

    Flink之API的使用(2):Transform算子的使用

    Flink之API的使用(3):Source的使用 

    具体代码实现如下所示:

    1、main函数中代码实现:

    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    
    // 1、从文件中读取数据
    val fileStream: DataStream[String] = env.readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
    
    // 2、从kafka中读取数据
    // 2.1、创建kafka的properties配置信息对象
    val prop: Properties = new Properties()
    prop.setProperty("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092")
    prop.setProperty("group.id", "flink-consumer-group")
    prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    prop.setProperty("auto.offset.reset", "latest")
    // 2.2、添加kafka的source源
    val kafkaStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("flinkTestTopic", new SimpleStringSchema(), prop))
    
    // 3、自定义source源(自定义源需要创建一个自定义源类,并继承SourceFunction)
    val mySensorStream: DataStream[SensorReading] = env.addSource(new MySensorSource(2))
    
    // 打印数据
    mySensorStream.print()
    
    // 启动执行环境,运行任务
    env.execute("SourceDemo")

    2、自定义source源代码实现:

    /**
     * 自定义一个生成 SensorReading(温度传感器) 的源
     */
    class MySensorSource(sensorNum: Int) extends SourceFunction[SensorReading] {
    
        /**
         * flag: 表示数据源是否还在正常运行
         */
        var running: Boolean = true
    
        /**
         * 当启动数据源时,会在此方法中生成数据,并通过ctx(环境上下文)输出
         *
         * @param ctx 环境上下文
         */
        override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
            // 初始化一个随机数发生器
            val rand: Random = new Random()
    
            // 初始化sensorNum个传感器(包括初始化名称,时间戳,温度)
            var curTemp: immutable.Seq[SensorReading] = 1.to(sensorNum).map(
                i => SensorReading("sensor_" + i, System.currentTimeMillis(), 65 + rand.nextGaussian() * 20)
            )
    
            // 每1000毫秒更新一次传感器中的温度和时间戳,并通过ctx将数据输出
            while (running) {
    
                val curTime: Long = System.currentTimeMillis()
    
                curTemp.foreach(sensorReading => {
                    sensorReading.timestamp = curTime
                    sensorReading.temperature = sensorReading.temperature + rand.nextGaussian()
                    ctx.collect(sensorReading)
                })
    
                Thread.sleep(1000)
            }
        }
    
        /**
         * 停止此源(将flag修改为false)
         */
        override def cancel(): Unit = {
            running = false
        }
    }
    你现在所遭遇的每一个不幸,都来自一个不肯努力的曾经
  • 相关阅读:
    css中!important的用法
    mysql分区
    js 随机生成信用卡号
    js argument
    lnmp
    php的mcrypt
    php gd
    php socket
    最优服务次序问题 水 NOJ1254
    众数问题 NOJ 1207
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133323.html
Copyright © 2011-2022 走看看