Flume线上日志采集【模板】
预装软件
Java
HDFS
Lzo/Lzop
系统版本
Flume 1.5.0-cdh5.4.0
系统流程图
flume-env.sh配置文件
export JAVA_HOME=/usr/local/jdk1.7.0_55
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
单机版写HDFS配置文件
[root@bs038 conf]# more flume_directHDFS.conf
#Define agent
agent1.channels=ch1
agent1.sources=avro-source1
agent1.sinks=log-sink1
#Defineamemorychannelcalledch1onagent1
agent1.channels.ch1.type=memory
agent1.channels.ch1.capacity=100000
agent1.channels.ch1.transactionCapacity=100000
agent1.channels.ch1.keep-alive=30
agent1.sources.avro-source1.interceptors = i1
agent1.sources.avro-source1.interceptors.i1.type = timestamp
#definesourcemonitorafile
agent1.sources.avro-source1.type=exec
agent1.sources.avro-source1.shell=/bin/bash -c
#agent1.sources.avro-source1.command=tail-n+0-F/home/storm/tmp/id.txt
agent1.sources.avro-source1.command=tail -F test.log
agent1.sources.avro-source1.channels=ch1
agent1.sources.avro-source1.threads=5
#Definealoggersinkthatsimplylogsalleventsitreceives
#andconnectittotheotherendofthesamechannel.
agent1.sinks.log-sink1.channel=ch1
agent1.sinks.log-sink1.type=hdfs
agent1.sinks.log-sink1.hdfs.path=/user/hadoop/cndns/flume/%Y%m%d%M
agent1.sinks.log-sink1.hdfs.writeFormat=Text
agent1.sinks.log-sink1.hdfs.filePrefix=cdns
agent1.sinks.log-sink1.hdfs.inUseSuffix=.tmp
#agent1.sinks.log-sink1.hdfs.fileType=DataStream
agent1.sinks.log-sink1.hdfs.fileType=CompressedStream
agent1.sinks.log-sink1.hdfs.codeC=lzop
agent1.sinks.log-sink1.hdfs.rollInterval=0
agent1.sinks.log-sink1.hdfs.rollSize=67108864
agent1.sinks.log-sink1.hdfs.rollCount=0
agent1.sinks.log-sink1.hdfs.batchSize=1000
agent1.sinks.log-sink1.hdfs.txnEventMax=1000
agent1.sinks.log-sink1.hdfs.callTimeout=60000
agent1.sinks.log-sink1.hdfs.appendTimeout=60000
终端配置文件
[root@bs038 conf]# more agent038.conf
agent1.sources = avro-source1
agent1.sinks = k1
agent1.channels = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bs022.zx.nicx.cn
agent1.sinks.k1.port = 44444
#Defineamemorychannelcalledch1onagent1
agent1.channels.ch1.type=memory
agent1.channels.ch1.capacity=100000
agent1.channels.ch1.transactionCapacity=100000
agent1.channels.ch1.keep-alive=30
agent1.sources.avro-source1.interceptors = i1
agent1.sources.avro-source1.interceptors.i1.type = timestamp
agent1.sources.avro-source1.type=exec
agent1.sources.avro-source1.shell=/bin/bash -c
agent1.sources.avro-source1.command=tail -F test.log
agent1.sources.avro-source1.channels=ch1
agent1.sources.avro-source1.threads=5
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = flume.checkpoint
agent1.channels.c1.dataDirs = flume.data
agent1.channels.c1.capacity = 200000000
agent1.channels.c1.keep-alive = 30
agent1.channels.c1.write-timeout = 30
agent1.channels.c1.checkpoint-timeout=600
agent1.sources.avro-source1.channels = c1
agent1.sinks.k1.channel = c1
中间传输流配置文件
agent2.sources = r2
agent2.sinks = k2
agent2.channels = c2
agent2.sources.r2.type = avro
agent2.sources.r2.bind = bs022.zx.nicx.cn
agent2.sources.r2.port = 44444
agent2.sinks.k2.type = avro
agent2.sinks.k2.hostname = bs042.zx.nicx.cn
agent2.sinks.k2.port = 55555
agent2.channels.c2.type = file
agent2.channels.c2.checkpointDir = flume.checkpoint
agent2.channels.c2.dataDirs = flume.data
agent2.channels.c2.capacity = 200000000
agent2.channels.c2.keep-alive = 30
agent2.channels.c2.write-timeout = 30
agent2.channels.c2.checkpoint-timeout=600
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2
写HDFS配置文件
[root@bs042 flume-ng-1.5.0-cdh5.4.0]# more conf/agent042.conf
#Define agent
agent1.channels=ch1
agent1.sources=avro-source1
agent1.sinks=log-sink1
#definesourcemonitorafile
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = bs042.zx.nicx.cn
agent1.sources.avro-source1.port = 55555
agent1.sources.avro-source1.interceptors = i1
agent1.sources.avro-source1.interceptors.i1.type = timestamp
#Definealoggersinkthatsimplylogsalleventsitreceives
#andconnectittotheotherendofthesamechannel.
agent1.sinks.log-sink1.channel=ch1
agent1.sinks.log-sink1.type=hdfs
agent1.sinks.log-sink1.hdfs.path=/user/hadoop/cndns/flume/%Y%m%d
agent1.sinks.log-sink1.hdfs.writeFormat=Text
agent1.sinks.log-sink1.hdfs.filePrefix=cdns
agent1.sinks.log-sink1.hdfs.inUseSuffix=.tmp
#agent1.sinks.log-sink1.hdfs.fileType=DataStream
agent1.sinks.log-sink1.hdfs.fileType=CompressedStream
agent1.sinks.log-sink1.hdfs.codeC=lzop
agent1.sinks.log-sink1.hdfs.rollInterval=0
agent1.sinks.log-sink1.hdfs.rollSize=67108864
agent1.sinks.log-sink1.hdfs.rollCount=0
agent1.sinks.log-sink1.hdfs.batchSize=1000
agent1.sinks.log-sink1.hdfs.txnEventMax=1000
agent1.sinks.log-sink1.hdfs.callTimeout=60000
agent1.sinks.log-sink1.hdfs.appendTimeout=60000
#Defineamemorychannelcalledch1onagent1
agent1.channels.ch1.type = file
agent1.channels.ch1.checkpointDir = flume.checkpoint
agent1.channels.ch1.dataDirs = flume.data
agent1.channels.ch1.capacity = 200000000
agent1.channels.ch1.keep-alive = 30
agent1.channels.ch1.write-timeout = 30
agent1.channels.ch1.checkpoint-timeout=600
agent1.sources.avro-source1.channels = ch1
agent1.sinks.log-sink1.channel = ch1
tail -F断点续传问题
tail -n +$(tail -n1 num) -F test.log 2>&1 | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "num";fflush("")}' num –
【注】