zoukankan      html  css  js  c++  java
  • Flume下读取kafka数据后再打把数据输出到kafka,利用拦截器解决topic覆盖问题

    1:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。

    2:Kafka Topic覆盖问题解决方案,利用flume的拦截器

    #拦截器处理,topic覆盖问题
    agent_log.sources.kafka0.interceptors = i1
    agent_log.sources.kafka0.interceptors.i1.type = static
    agent_log.sources.kafka0.interceptors.i1.key = topic
    agent_log.sources.kafka0.interceptors.i1.preserveExisting = false
    agent_log.sources.kafka0.interceptors.i1.value = testsongout

    3:flume config完整配置

    agent_log.sources = kafka0 
    agent_log.channels = ch0 
    agent_log.sinks = sink0
    
    agent_log.sources.kafka0.channels = ch0
    agent_log.sinks.sink0.channel = ch0
    
    #sources定义
    agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource
    agent_log.sources.kafka0.kafka.bootstrap.servers = localhost:9092
    #agent.sources.kafka-source.zookeeper.connect =127.0.0.1:2181
    agent_log.sources.kafka0.kafka.topics = testsong,songtest
    agent_log.sources.kafka0.kafka.group.id= test 
    
    #拦截器处理,topic覆盖问题
    agent_log.sources.kafka0.interceptors = i1
    agent_log.sources.kafka0.interceptors.i1.type = static
    agent_log.sources.kafka0.interceptors.i1.key = topic
    agent_log.sources.kafka0.interceptors.i1.preserveExisting = false
    agent_log.sources.kafka0.interceptors.i1.value = testsongout
    
    #channels定义
    agent_log.channels.ch0.type = memory
    agent_log.channels.ch0.capacity = 2048
    agent_log.channels.ch0.transactionCapacity = 1000
    
    #sink定义
    agent_log.sinks.sink0.channel = ch0
    agent_log.sinks.sink0.type = org.apache.flume.sink.kafka.KafkaSink  
    agent_log.sinks.sink0.brokerList = localhost:9092  
    agent_log.sinks.sink0.topic = testsongout  
  • 相关阅读:
    mysql联合主键,也就是两个数据字段一起做主键的情况
    PHP细节,empty,is_null,isset,if()
    PHP细节,PHP手册中常见的一句话:该函数是二进制安全的
    git和github的学习
    用WPS查看两篇word文档异同之处
    js全角字符转为半角字符
    坑(十七)—— Linux无法挂载NTFS格式的U盘
    subprocess模块
    吴裕雄--天生自然--Go 语言学习笔记--Go 语言数组
    吴裕雄--天生自然--Go 语言学习笔记--Go 语言变量作用域
  • 原文地址:https://www.cnblogs.com/songpingyi/p/7366087.html
Copyright © 2011-2022 走看看