flume的数据源采用taiDir,sink类型选择kafka类型
测试目标:flume监控某一个目录的日志文件,并将文件存储到kafka中,在kafka的消费端可以实现数据的消费
dip005、dip006、dip007安装kafka
dip005、dip006、dip007安装flume
1、kafka创建topic
./kafka-topics.sh --create --zookeeper dip005:2181,dip006:2181,dip007 --replication-factor 1 --partitions 1 --topic test
2、编写flume配置
# source的名字 agent.sources = s1 agent.channels = c1 agent.sinks = r1 # 指定source使用的channel agent.sources.s1.channels = c1 agent.sinks.r1.channel = c1 ######## source相关配置 ######## # source类型 agent.sources.s1.type = TAILDIR agent.sources.s1.positionFile = /flume/taildir_position.json agent.sources.s1.filegroups = f1 agent.sources.s1.filegroups.f1=/flume/data/.*log agent.sources.s1.fileHeader = true ######## channel相关配置 ######## # channel类型 #agent.channels.c1.type = file #agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs #agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir #agent.channels.c1.capacity = 1000 #agent.channels.c1.transactionCapacity = 100 agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 ######## sink相关配置 ######## # sink类型 agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.r1.brokerList = dip005:9092,dip006:9092,dip007:9092 agent.sinks.r1.topic = test clog.sinks.sink_log1.flumeBatchSize = 2000 clog.sinks.sink_log1.kafka.producer.acks = 1
3.启动flume
./bin/flume-ng agent -n agent -c conf -f conf/taildir_conf -Dflume.root.logger=DEBUG,console
4.在监控目/flume/data 里放入*log文件,或者往*log文件里写数据
5.进入kafka的消费者看,执行消费,即可看到*log里面的数据
./kafka-console-consumer.sh --bootstrap-server dip005:9092 --from-beginning --topic test