普通启动flume:
nohup bin/flume-ng agent --conf-file conf/kafka_flume_hdfs.conf --name a2 -Dflume.pirate.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &
JSON监控启动:
nohup /home/pirate/programs/flume/bin/flume-ng agent --conf-file /home/pirate/programs/flume/conf/job2/kafka_flume_hdfs.conf --name a2 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Dflume.root.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &
ps:-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 (此处使用了Monitor监控,json串进行监控)注意用户权限问题(root、pirate)
访问web界面:http://192.168.2.21:34545/metrics
flume启停脚本:
#! /bin/bash
case $1 in
"start"){
for i in 192.168.x.xx
do
echo " --------启动 $i flume-------"
nohup /home/pirate/programs/flume/bin/flume-ng agent
--conf-file /home/pirate/programs/flume/conf/flume_udplog_hdfs.conf
--name a3 -Dflume.monitoring.type=http -Dflume.monitoring.port=34546
-Dflume.pirate.logger=INFO,LOGFILE >'/home/pirate/programs/flume/udp_log.txt' 2>&1 &
done
};;
"stop"){
for i in 192.168.x.xx
do
echo " --------停止 $i flume-------"
kill -15 `ps -ef | grep flume_udplog_hdfs | grep -v grep | awk '{print $2}'`
done
};;
esac
------------------------------------------
配置的几个参数问题
#source2
a3.sources.r2.batchSize = 800000
a3.sources.r2.batchDurationMillis = 30000
## channel2
a3.channels.c2.maxFileSize = 2146435071
a3.channels.c2.capacity = 1000000
a3.channels.c2.transactionCapacity = 800000
a3.channels.c2.keep-alive = 60
## sink2
a3.sinks.k2.type = hdfs
a3.sinks.k2.hdfs.path = /kafka_flume/udplog/%Y%m%d/%H%M/Flow.log
#a3.sinks.k2.hdfs.filePrefix = Flow
#a3.sinks.k2.hdfs.fileSuffix = .log
a3.sinks.k2.hdfs.batchSize = 800000
a3.sinks.k2.hdfs.round = true
a3.sinks.k2.hdfs.roundValue = 10
a3.sinks.k2.hdfs.roundUnit = minute
a3.sinks.k2.hdfs.fileType = DataStream
[FLUME-3106] - 当sink的batchSize大于Memory Channel的transactionCapacity时,Flume会产生无穷无尽的数据
[FLUME-3107] - 当sink的batchSize大于File Channel的transactionCapacity时,Flume会产生无穷无尽的数据
[FLUME-2989] - Kafka
Channel指标缺少eventTakeAttemptCount(sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据)和 eventPutAttemptCount(Source尝试写入Channe的事件总数量)
https://blog.csdn.net/u013128262/article/details/89666626
----------------------------------------
此处使用了ganglia进行监控
-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.1.102:8649
nohup /home/pirate/programs/flume/bin/flume-ng agent --conf-file /home/pirate/programs/flume/conf/job3/kafka_flume_hdfs.conf --name a3
-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.2.26:8649
-Dflume.root.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &
bin/flume-ng agent
--conf conf/
--name a1
--conf-file job/flume-telnet-logger.conf
-Dflume.root.logger==INFO,console
-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.1.102:8649
访问地址: http://192.168.2.26/ganglia
------------------------------------------
flume
问题一:
oom问题解决:
在flume/bin/flume_ng文件中的 JAVA_OPTS="-Xmx20m" 可以调大参考(2048m)
改配置文件flume/conf/flume-env.sh export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" 参数
https://blog.csdn.net/qq_21383435/article/details/77880129?locationNum=10&fps=1
问题二:
fliechannel的问题?
在file channel堆积数据过多,解决办法
http://www.voidcn.com/article/p-kpecnhck-xb.html
flume采集kafka topic 下的partition问题(一个消费者只能对应一个分区)
nohup /home/pirate/programs/flume/bin/flume-ng agent --conf-file /home/pirate/programs/flume/conf/job1/kafka_flume_hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &
问题三:
小文件滚动问题(考虑到副本复制)
https://blog.csdn.net/qq_39192827/article/details/99065612
flume 单机问题解决与架构更改(调优问题)
https://yq.aliyun.com/articles/663675?spm=a2c4e.11153940.0.0.68f1601fPjA1Vm
问题四:
注意当修一下改配置参数的时候
a2.channels.c2.capacity = 10000000
a2.channels.c2.transactionCapacity = 8000000
需要把之前的checkpoint和data目录清空,否则会有冲突,即配置的检查点目录和file channel存储临时文件的目录
问题五:
file channel爆满,会打印错误日志(应该是不会影响数据的安全性,会有数据重复)
Channel详解****
https://blog.csdn.net/u010670689/article/details/73477859 (包含源码分析)
file channel详解
https://www.iteye.com/blog/manzhizhen-2298394
问题六:
sink指定文件
https://blog.csdn.net/weixin_42615385/article/details/89739795
flume监控?
------------------------------------------------------------------
备注:
调试过程中我们发现,数据吞吐率达到80~90M/s时,JVM大致需要15G MEM。
File Channel的优化
#优化部分
# useDualCheckpoints false 备份检查点。如果将其设置为true,则 必须设置backupCheckpointDir
# backupCheckpointDir – 备份检查点的目录。此目录不得与数据目录或检查点目录相同
# 文件压缩方面
## 控制输出文件是原生文件。
# a1.sinks.k1.hdfs.fileType = CompressedStream
# a1.sinks.k2.hdfs.fileType = CompressedStream
#
# a1.sinks.k1.hdfs.codeC = lzop
# a1.sinks.k2.hdfs.codeC = lzop
##HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符
## a1.sinks.k1.sink.serializer.appendNewline = false
a3.sinks.k3.hdfs.fileType = CompressedStream
a3.sinks.k3.hdfs.codeC = lzop
a3.sinks.k3.sink.serializer.appendNewline = false
#每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
agent.sinks.hdfsSink_1.hdfs.threadsPoolSize = 100
a2.sinks.k3.hdfs.threadsPoolSize = 100
如果不是Hadoop节点 上传不到hdfs节点上
需拷贝相关jar包到 flume的lib下
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
问题一:
ERROR [PollableSourceRunner-KafkaSource-r2] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
这个错误提示比较直白,意思是消费者消费了数据,但在规定时间内没有commit,所以kafka认为这个consumer挂掉了,这时对consumer的group进行再平衡。
增加消费超时时间。
消费超时时间通过heartbeat.interval.ms设置,heartbeat.interval.ms的大小不能超过session.timeout.ms。
session.timeout.ms必须在[group.min.session.timeout.ms, group.max.session.timeout.ms]范围内
问题二:
ERROR [PollableSourceRunner-KafkaSource-r2] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or t
hat the channel capacity is just too low. [channel=c3]
问题三:
ERROR [hdfs-k3-call-runner-1] (org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync:268) - Error while trying to hflushOrSync!
[SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:443) - HDFS IO error
java.io.IOException: Callable timed out after 10000 ms on file: hdfs://192.168.10.249:8020/user/pirate/flume_test/sg3/sg3-udplog/MoneyFlow/20200820//FlumeData.1597876537632.tmp
问题四:
磁盘爆满,channel中的data数据积压过多
注意相关参数的设置:
batchsize <=transactionCapacity<=capacity
问题五:
22 一月 2021 17:50:28,691 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:443) - HDFS IO error
java.io.IOException: Mkdirs failed to create
需要把core-site和hdfs-site拷贝到flume/conf/ 目录下
问题六:
29 一月 2021 17:37:24,564 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c11]. Due to java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:348)
at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
at org.apache.flume.channel.file.ReplayHandler.processCommit(ReplayHandler.java:394)
at org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:334)
at org.apache.flume.channel.file.Log.doReplay(Log.java:530)
at org.apache.flume.channel.file.Log.replay(Log.java:456)
at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:298)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
29 一月 2021 17:37:29,565 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c11]. Due to java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:348)
at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
at org.apache.flume.channel.file.ReplayHandler.processCommit(ReplayHandler.java:394)
at org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:334)
at org.apache.flume.channel.file.Log.doReplay(Log.java:530)
at org.apache.flume.channel.file.Log.replay(Log.java:456)
at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:298)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
解决:
只要把配置文件中的关于channel配置checkpointDir和dataDir目录清空就可以 (删除对应channel的checkpoint和data目录即可,两个都要进行删除)