cd /usr/local/flume/conf
vim flume-exec-total.conf
## Explain #通过sink把数据分别输出到kafka和HDFS上 # Name the components on this agent agent.sources = r1 agent.sinks = k1 k2 agent.channels = c1 c2 # Describe/configuration the source agent.sources.r1.type = exec agent.sources.r1.command = tail -f /root/test.log agent.sources.r1.shell = /bin/bash -c ## kafka #Describe the sink agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.topic = kafkatest agent.sinks.k1.brokerList = master:9092 agent.sinks.k1.requiredAcks = 1 agent.sinks.k1.batchSize = 2 # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 #agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.r1.channels = c1 c2 agent.sinks.k1.channel = c1 ## hdfs #Describe the sink agent.sinks.k2.type = hdfs agent.sinks.k2.hdfs.path = hdfs://master:9000/data/flume/tail agent.sinks.k2.hdfs.fileType=DataStream agent.sinks.k2.hdfs.writeFormat=Text #agent.sinks.k2.hdfs.rollInterval = 0 #agent.sinks.k2.hdfs.rollSize = 134217728 #agent.sinks.k2.hdfs.rollCount = 1000000 agent.sinks.k2.hdfs.batchSize=10 ## Use a channel which buffers events in memory agent.channels.c2.type = memory #agent.channels.c1.capacity = 1000 #agent.channels.c2.transactionCapacity = 100 ## Bind the source and sink to the channel #agent.sources.r1.channels = c2 agent.sinks.k2.channel = c2
验证:
1. 首先启动HDFS和kafka
2. 创建topic
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest
启动flume以及测试
3. 启动Flume
服务端 /usr/local/flume/bin/flume-ng agent -f flume-exec-total.conf -n agent -Dflume.root.logger=INFO, console 客户端 echo "wangzai doubi" > test.log
4. 启动kafka客户端
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning
结果如图:
Flume服务端:
HDFS:
Kafka客户端: