zoukankan      html  css  js  c++  java
  • Flink学习(九) Sink到Kafka

    package com.wyh.streamingApi.sink
    
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
    
    
    //温度传感器读数样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    object Sink2Kafka {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        /**
          * sensor_1,1547718199,35.80018327300259
          * sensor_6,1547718201,15.402984393403084
          * sensor_7,1547718202,6.720945201171228
          * sensor_10,1547718205,38.1010676048934444
          * sensor_1,1547718199,35.1
          * sensor_1,1547718199,31.0
          * sensor_1,1547718199,39
          */
        //Source操作
    //    val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        val properties = new Properties()
        properties.setProperty("zookeeper.connect", "tuijian:2181")
        properties.setProperty("bootstrap.servers", "tuijian:9092")
        properties.setProperty("group.id", "test-consumer-group")
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("auto.offset.reset", "latest") //偏移量自动重置
    
        val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))
    
    
    
        //Transform操作
        val dataStream: DataStream[String] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble).toString //转成String方便序列化输出
        })
    
        //Sink操作
        dataStream.addSink(new FlinkKafkaProducer011[String]("tuijian:9092","sinkTest",new SimpleStringSchema()))
    
        dataStream.print()
        env.execute("kafka sink test")
    
    
      }
    
    }

  • 相关阅读:
    利用国内的源安装 Python第三方库
    Python 算法(1) 快速排序
    Python 算法(2) 哈夫曼编码 Huffman Encoding
    Python sql注入 过滤字符串的非法字符
    tesseract中文语言文件包 下载
    python 多线程爬虫 实例
    Python 爬虫实例(5)—— 爬取爱奇艺视频电视剧的链接(2017-06-30 10:37)
    Django的ORM中如何判断查询结果是否为空,判断django中的orm为空
    Python 爬虫实例(4)—— 爬取网易新闻
    NLTK在自然语言处理
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12920406.html
Copyright © 2011-2022 走看看