zoukankan      html  css  js  c++  java
  • Flink Kafka作为Source和Sink

    实现kafka进,kafka出的流程。

    代码:

    object KafkaTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //kafka配置文件
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "hadoop102:9092")
        properties.setProperty("group.id", "consumer-group")
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("auto.offset.reset", "latest")
    
        //接收kafka的sensor这个topic发来的数据
        val kafkaDataStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))
    
        val dataStream: DataStream[String] = kafkaDataStream.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble).toString
        })
    
        //发送到kafka的flink-sink-test这个topic
        dataStream.addSink(new FlinkKafkaProducer011[String]("hadoop102:9092","flink-sink-test",new SimpleStringSchema()))
    
        env.execute("kafka")
      }
    
    }
    

    生产者发送数据,供flink消费

    [atguigu@hadoop102 bin]$ ./kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor
    >sensor_1, 1547718199, 35.80018327300259
    >sensor_1, 1547718201, 40.8
    >sensor_1, 1547718202, 998                 
    >
    

    消费者查看flink发回的数据

    [atguigu@hadoop102 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic flink-sink-test
    SensorReading(sensor_1,1547718199,35.80018327300259)
    SensorReading(sensor_1,1547718201,40.8)
    SensorReading(sensor_1,1547718202,998.0)
    

      

  • 相关阅读:
    jquery正则表达式验证:正整数(限制长度)
    H5页面快速搭建之高级字体应用实践
    如何用Python写一个贪吃蛇AI
    HashMap多线程并发问题分析
    为RecyclerView打造通用Adapter 让RecyclerView更加好用
    学好Mac常用命令,助力iOS开发
    使用 Realm 和 Swift 创建 ToDo 应用
    看Facebook是如何优化React Native性能
    利用github搭建个人maven仓库
    Objective-C Runtime之着魔的UIAlertView
  • 原文地址:https://www.cnblogs.com/noyouth/p/12741509.html
Copyright © 2011-2022 走看看