zoukankan      html  css  js  c++  java
  • flume 整合kafka

     背景:系统的数据量越来越大,日志不能再简单的文件的保存,如此日志将会越来越大,也不方便查找与分析,综合考虑下使用了flume来收集日志,收集日志后向kafka传递消息,下面给出具体的配置

    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,   
    # in this case called 'agent'
    
    agent.sources = r1
    agent.channels = c1
    agent.sinks = s1
    
    # For each one of the sources, the type is defined
    agent.sources.r1.type = netcat
    agent.sources.r1.bind = localhost
    agent.sources.r1.port = 10245
    agent.sources.r1.charset = UTF-8
    
    # The channel can be defined as follows.
    agent.sources.r1.channels = c1
    
    # Each sink's type must be defined
    agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.s1.topic = test
    agent.sinks.s1.brokerList = ip:9092
    agent.sinks.s1.requiredAcks = 1
    agent.sinks.s1.batchSize = 20
    agent.sinks.s1.channel = c1
    
    # Each channel's type is defined.
    agent.channels.c1.type = memory
    
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    agent.channels.c1.capacity = 100

    启动方式:

       bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name agent -Dflume.root.logger=INFO,console

    再启动之前一定要先启动kafka,这里可能会有一个错误

      

    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired

    这个是因为默认情况下kafka是广播的localhost,所以如果不是同一个机器需要修改下配置

    advertised.listeners=PLAINTEXT://ip:9092把默认的localhost替换成IP地址 重新启动下就可以了.

  • 相关阅读:
    005 HTML+CSS(Class027
    004 HTML+CSS(Class024
    003 HTML+CSS(Class011
    002HTML+CSS(class007-010)
    001HTML+CSS(class001-006)
    021 vue路由vue-router
    020 Vue 脚手架CLI的使用
    019 Vue webpack的使用
    018 vue的watch属性
    017 vue的插槽的使用
  • 原文地址:https://www.cnblogs.com/EncryptingLife/p/7456188.html
Copyright © 2011-2022 走看看