zoukankan      html  css  js  c++  java
  • flume taidir to kafkasink

    flume的数据源采用taiDir,sink类型选择kafka类型

    测试目标:flume监控某一个目录的日志文件,并将文件存储到kafka中,在kafka的消费端可以实现数据的消费

    dip005、dip006、dip007安装kafka

    dip005、dip006、dip007安装flume

    1、kafka创建topic

    ./kafka-topics.sh --create --zookeeper dip005:2181,dip006:2181,dip007 --replication-factor 1 --partitions 1 --topic test

    2、编写flume配置

    # source的名字
    agent.sources = s1
    agent.channels = c1
    agent.sinks = r1
    
    # 指定source使用的channel
    agent.sources.s1.channels = c1
    agent.sinks.r1.channel = c1
    
    ######## source相关配置 ########
    # source类型
    agent.sources.s1.type = TAILDIR
    agent.sources.s1.positionFile = /flume/taildir_position.json
    agent.sources.s1.filegroups = f1
    agent.sources.s1.filegroups.f1=/flume/data/.*log
    agent.sources.s1.fileHeader = true
    
    ######## channel相关配置 ########
    # channel类型
    #agent.channels.c1.type = file
    #agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
    #agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
    #agent.channels.c1.capacity = 1000
    #agent.channels.c1.transactionCapacity = 100
    
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    agent.channels.c1.transactionCapacity = 100
    
    
    ######## sink相关配置 ########
    # sink类型
    agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.r1.brokerList = dip005:9092,dip006:9092,dip007:9092
    agent.sinks.r1.topic = test
    clog.sinks.sink_log1.flumeBatchSize = 2000
    clog.sinks.sink_log1.kafka.producer.acks = 1

    3.启动flume

    ./bin/flume-ng agent -n agent -c conf -f conf/taildir_conf  -Dflume.root.logger=DEBUG,console

    4.在监控目/flume/data 里放入*log文件,或者往*log文件里写数据

    5.进入kafka的消费者看,执行消费,即可看到*log里面的数据

    ./kafka-console-consumer.sh --bootstrap-server dip005:9092 --from-beginning --topic test

  • 相关阅读:
    [TroubleShooting]Neither the partner nor the witness server instance for database is availble
    VM tools安装错误The path "" is not a valid path to the xx generic kernel headers.
    博客导出工具
    puma 配置,启动脚本
    OpenCV改变像素颜色
    Android进程的生命周期
    谷歌地图换接口的新闻
    POJ 3714 Raid 近期对点题解
    CF241B Friends
    2018-2-13-wpf-使用-Dispatcher.Invoke-冻结窗口
  • 原文地址:https://www.cnblogs.com/students/p/10537392.html
Copyright © 2011-2022 走看看