收集kafka消息到hdfs
开hdfs
start-dfs.sh
1.说明
每天上报的日志可能会含有以前的日志数据。但是每天上报的日志在一个以日期分割的目录内。
ym/day
2.umeng_kafka_to_hdfs.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = s102:9092
a1.sources.r1.kafka.topics = big12-umeng-raw-logs
a1.sources.r1.kafka.consumer.group.id = g10
a1.channels.c1.type=memory
a1.sinks.k1.type = hdfs
#hdfs文件存储路径 ,以年月日为目录存储
a1.sinks.k1.hdfs.path = /user/centos/umeng_big12/raw-logs/%Y%m/%d
#文件前缀
a1.sinks.k1.hdfs.filePrefix = events-
# 对目录进行滚动
a1.sinks.k1.hdfs.round = true
# 滚动时间单位 1 天
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = day
# 文件滚动 时间:30s 大小:10240k 行数:500
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.rollCount = 500
# 时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 文件
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
3.准备hdfs
hdfs dfs -mkdir -p /user/centos/umeng_big12/raw-logs
/user/centos/umeng_big12/raw-logs
4.启动flume进程
flume-ng agent -f /soft/flume/conf/umeng_kafka_to_hdfs.conf -n a1 &