zoukankan      html  css  js  c++  java
  • Flink从Kafka 0.8中读取多个Topic时的问题

        Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,从Kafka中读取指定Topic的数据,如果要从多个Topic读取数据,可以如下操作:

    1.application.conf中配置

       如果使用了配置管理库typesafe.config,可以在其application.conf按如下方式配置List类型的元素

    myToicList:["t1","t2","t3"]

    2.读取配置文件

    object MyFlinkConfig {
      import com.typesafe.config.{ Config, ConfigFactory }
      import net.ceedubs.ficus.Ficus._
    
    
      def apply(): MyFlinkConfig = apply(ConfigFactory.load)
    
      def apply(applicationConfig: Config): MyFlinkConfig = {
    
        val config = applicationConfig.getConfig("MyFlinkConfig")
    
        new MyFlinkConfig (config.as[List[String]]("myTopicList"))
      }
    }
    
    case class MyFlinkConfig (myTopicList: List[String]) extends Serializable {}

    3.读取多个Topic

     因为FlinkKafkaConsumer08使用Java实现的,而MyFlinkConfig 中的List是Scala的List,所以要将Scala的List转为Java的List

    val config =MyFlinkConfig()
    import scala.collection.JavaConversions._
    val kafkaConsumer=new FlinkKafkaConsumer08[MonitorDataRecord](config.myTopicList, new SimpleStringSchema(), kafkaProps)

    4.遇到的问题

    4.1 如果要读取的Topic不存在,则应用程序直接报错,因此Topic在配置文件中配置时一定要正确

    4.2 如果要读取的Topic列表中,其中一个在Kafka中没有数据,而你又基于Event Time提取Timestamp并且设置Watermark,会导致整个Topic列表都没法基于时间窗口触发操作,解决方案:

        先rebalance,然后再设置水位:   

        val monitorSampling = env
          .addSource(kafkaConsumer)
          .rebalance
          .assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))
  • 相关阅读:
    关于Delphi中RS Float字段只有4位及OADateTime不能显示到秒的解决办法
    xml特殊字符处理(js)
    总结一下最近将163邮箱拖动效果改成兼容Firefox遇到的问题
    在XIB里面关于@property,@synthesize,release,dealloc的怪现象
    关于Core Data里面删除了一个被retain的NSManagedObject
    Hello cnblogs' world!
    html5离线应用程序 Amy
    css3布局相关的样式 Amy
    html5拖放API Amy
    css3MediaQueries的相关样式 Amy
  • 原文地址:https://www.cnblogs.com/liugh/p/7479515.html
Copyright © 2011-2022 走看看