zoukankan      html  css  js  c++  java
  • FLUME KAFKA SOURCE 和 SINK 使用同一个 TOPIC

    FLUME KAFKA SOURCE 和 SINK 使用同一个 TOPIC

    最近做了一个事情,过滤下kakfa中的数据后,做这个就用到了flume,直接使用flume sourceflume sink,中间再加一个过滤的intercetpor就可以了。
    要做的事情很简单,但是遇到了一个问题,就是sink中指定的topic不起作用。

    过程是这样的:

    KafkaSource.doProcess方法会给eventheader中添加一个topic信息,里面保存了消费的topic

    if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) {
              headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
            }
    

    KafkaSink.process方法中会首先将eventheader中的topic信息作为sinkTopic,如果header中没有topic信息,才会用sink定义的topic

            eventTopic = headers.get(TOPIC_HEADER);
            if (eventTopic == null) {
              eventTopic = topic;
            }
    

    这就尴尬了,自定义的sinkTopic没用,解决办法:

    1. KafkaSink.process中去掉这几行代码,重新打包。
    2. 使用intercetpor
      具体参考这个
  • 相关阅读:
    MySQL系列(三) MySQL的约束
    mysql 下载 国内 镜像
    ckeditor
    比较时间
    远程获取文件
    多线程一例
    requests
    json传递对象字典
    pymysql和mysqldb的区别
    sql
  • 原文地址:https://www.cnblogs.com/SpeakSoftlyLove/p/6445808.html
Copyright © 2011-2022 走看看