zoukankan      html  css  js  c++  java
  • Flume参数说明及其他

    普通启动flume:
    nohup bin/flume-ng agent --conf-file conf/kafka_flume_hdfs.conf --name a2 -Dflume.pirate.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &


    JSON监控启动:
    nohup /home/pirate/programs/flume/bin/flume-ng agent --conf-file /home/pirate/programs/flume/conf/job2/kafka_flume_hdfs.conf --name a2 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Dflume.root.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &

    ps:-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 (此处使用了Monitor监控,json串进行监控)注意用户权限问题(root、pirate)

    访问web界面:http://192.168.2.21:34545/metrics


    flume启停脚本:
    #! /bin/bash

    case $1 in
    "start"){
    for i in 192.168.x.xx
    do
    echo " --------启动 $i flume-------"
    nohup /home/pirate/programs/flume/bin/flume-ng agent
    --conf-file /home/pirate/programs/flume/conf/flume_udplog_hdfs.conf
    --name a3 -Dflume.monitoring.type=http -Dflume.monitoring.port=34546
    -Dflume.pirate.logger=INFO,LOGFILE >'/home/pirate/programs/flume/udp_log.txt' 2>&1 &
    done
    };;
    "stop"){
    for i in 192.168.x.xx
    do
    echo " --------停止 $i flume-------"
    kill -15 `ps -ef | grep flume_udplog_hdfs | grep -v grep | awk '{print $2}'`
    done

    };;
    esac

    ------------------------------------------
    配置的几个参数问题

    #source2
    a3.sources.r2.batchSize = 800000
    a3.sources.r2.batchDurationMillis = 30000


    ## channel2
    a3.channels.c2.maxFileSize = 2146435071
    a3.channels.c2.capacity = 1000000
    a3.channels.c2.transactionCapacity = 800000
    a3.channels.c2.keep-alive = 60

    ## sink2
    a3.sinks.k2.type = hdfs
    a3.sinks.k2.hdfs.path = /kafka_flume/udplog/%Y%m%d/%H%M/Flow.log
    #a3.sinks.k2.hdfs.filePrefix = Flow
    #a3.sinks.k2.hdfs.fileSuffix = .log
    a3.sinks.k2.hdfs.batchSize = 800000
    a3.sinks.k2.hdfs.round = true
    a3.sinks.k2.hdfs.roundValue = 10
    a3.sinks.k2.hdfs.roundUnit = minute
    a3.sinks.k2.hdfs.fileType = DataStream


    [FLUME-3106] - 当sink的batchSize大于Memory Channel的transactionCapacity时,Flume会产生无穷无尽的数据
    [FLUME-3107] - 当sink的batchSize大于File Channel的transactionCapacity时,Flume会产生无穷无尽的数据
    [FLUME-2989] - Kafka

    Channel指标缺少eventTakeAttemptCount(sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据)和 eventPutAttemptCount(Source尝试写入Channe的事件总数量)

    https://blog.csdn.net/u013128262/article/details/89666626

    ----------------------------------------
    此处使用了ganglia进行监控
    -Dflume.monitoring.type=ganglia
    -Dflume.monitoring.hosts=192.168.1.102:8649

    nohup /home/pirate/programs/flume/bin/flume-ng agent --conf-file /home/pirate/programs/flume/conf/job3/kafka_flume_hdfs.conf --name a3
    -Dflume.monitoring.type=ganglia
    -Dflume.monitoring.hosts=192.168.2.26:8649
    -Dflume.root.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &

    bin/flume-ng agent
    --conf conf/
    --name a1
    --conf-file job/flume-telnet-logger.conf
    -Dflume.root.logger==INFO,console
    -Dflume.monitoring.type=ganglia
    -Dflume.monitoring.hosts=192.168.1.102:8649


    访问地址: http://192.168.2.26/ganglia
    ------------------------------------------

    flume
    问题一:
    oom问题解决:
    在flume/bin/flume_ng文件中的 JAVA_OPTS="-Xmx20m" 可以调大参考(2048m)
    改配置文件flume/conf/flume-env.sh export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" 参数

    https://blog.csdn.net/qq_21383435/article/details/77880129?locationNum=10&fps=1

    问题二:
    fliechannel的问题?
    在file channel堆积数据过多,解决办法
    http://www.voidcn.com/article/p-kpecnhck-xb.html

    flume采集kafka topic 下的partition问题(一个消费者只能对应一个分区)

    nohup /home/pirate/programs/flume/bin/flume-ng agent --conf-file /home/pirate/programs/flume/conf/job1/kafka_flume_hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/home/pirate/programs/flume/log.txt 2>&1 &

    问题三:
    小文件滚动问题(考虑到副本复制)
    https://blog.csdn.net/qq_39192827/article/details/99065612

    flume 单机问题解决与架构更改(调优问题)
    https://yq.aliyun.com/articles/663675?spm=a2c4e.11153940.0.0.68f1601fPjA1Vm

    问题四:
    注意当修一下改配置参数的时候
    a2.channels.c2.capacity = 10000000
    a2.channels.c2.transactionCapacity = 8000000
    需要把之前的checkpoint和data目录清空,否则会有冲突,即配置的检查点目录和file channel存储临时文件的目录

    问题五:
    file channel爆满,会打印错误日志(应该是不会影响数据的安全性,会有数据重复)

    Channel详解****
    https://blog.csdn.net/u010670689/article/details/73477859 (包含源码分析)

    file channel详解
    https://www.iteye.com/blog/manzhizhen-2298394

    问题六:
    sink指定文件
    https://blog.csdn.net/weixin_42615385/article/details/89739795


    flume监控?
    ------------------------------------------------------------------
    备注:
    调试过程中我们发现,数据吞吐率达到80~90M/s时,JVM大致需要15G MEM。

    File Channel的优化
    #优化部分
    # useDualCheckpoints false 备份检查点。如果将其设置为true,则 必须设置backupCheckpointDir
    # backupCheckpointDir – 备份检查点的目录。此目录不得与数据目录或检查点目录相同


    # 文件压缩方面
    ## 控制输出文件是原生文件。
    # a1.sinks.k1.hdfs.fileType = CompressedStream
    # a1.sinks.k2.hdfs.fileType = CompressedStream
    #
    # a1.sinks.k1.hdfs.codeC = lzop
    # a1.sinks.k2.hdfs.codeC = lzop

    ##HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符
    ## a1.sinks.k1.sink.serializer.appendNewline = false


    a3.sinks.k3.hdfs.fileType = CompressedStream
    a3.sinks.k3.hdfs.codeC = lzop

    a3.sinks.k3.sink.serializer.appendNewline = false


    #每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
    agent.sinks.hdfsSink_1.hdfs.threadsPoolSize = 100

    a2.sinks.k3.hdfs.threadsPoolSize = 100
    如果不是Hadoop节点 上传不到hdfs节点上
    需拷贝相关jar包到 flume的lib下
    commons-configuration-1.6.jar、
    hadoop-auth-2.7.2.jar、
    hadoop-common-2.7.2.jar、
    hadoop-hdfs-2.7.2.jar、
    commons-io-2.4.jar、
    htrace-core-3.1.0-incubating.jar


    问题一:
    ERROR [PollableSourceRunner-KafkaSource-r2] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
    这个错误提示比较直白,意思是消费者消费了数据,但在规定时间内没有commit,所以kafka认为这个consumer挂掉了,这时对consumer的group进行再平衡。

    增加消费超时时间。
    消费超时时间通过heartbeat.interval.ms设置,heartbeat.interval.ms的大小不能超过session.timeout.ms。
    session.timeout.ms必须在[group.min.session.timeout.ms, group.max.session.timeout.ms]范围内

    问题二:

    ERROR [PollableSourceRunner-KafkaSource-r2] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
    org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or t
    hat the channel capacity is just too low. [channel=c3]


    问题三:

    ERROR [hdfs-k3-call-runner-1] (org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync:268) - Error while trying to hflushOrSync!
    [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:443) - HDFS IO error
    java.io.IOException: Callable timed out after 10000 ms on file: hdfs://192.168.10.249:8020/user/pirate/flume_test/sg3/sg3-udplog/MoneyFlow/20200820//FlumeData.1597876537632.tmp

    问题四:
    磁盘爆满,channel中的data数据积压过多
    注意相关参数的设置:
    batchsize <=transactionCapacity<=capacity

    问题五:
    22 一月 2021 17:50:28,691 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:443) - HDFS IO error
    java.io.IOException: Mkdirs failed to create

    需要把core-site和hdfs-site拷贝到flume/conf/ 目录下

    问题六:
    29 一月 2021 17:37:24,564 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
    java.lang.IllegalStateException: Channel closed [channel=c11]. Due to java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:348)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
    at org.apache.flume.channel.file.ReplayHandler.processCommit(ReplayHandler.java:394)
    at org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:334)
    at org.apache.flume.channel.file.Log.doReplay(Log.java:530)
    at org.apache.flume.channel.file.Log.replay(Log.java:456)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:298)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
    29 一月 2021 17:37:29,565 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
    java.lang.IllegalStateException: Channel closed [channel=c11]. Due to java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:348)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Unable to add FlumeEventPointer [fileID=5, offset=0]. Queue depth = 1000000, Capacity = 1000000
    at org.apache.flume.channel.file.ReplayHandler.processCommit(ReplayHandler.java:394)
    at org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:334)
    at org.apache.flume.channel.file.Log.doReplay(Log.java:530)
    at org.apache.flume.channel.file.Log.replay(Log.java:456)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:298)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

    解决:
    只要把配置文件中的关于channel配置checkpointDir和dataDir目录清空就可以 (删除对应channel的checkpoint和data目录即可,两个都要进行删除)

  • 相关阅读:
    前端UI框架
    Knowledge
    Microsoft SQL Server
    ASP.NET MVC
    将博客搬至CSDN
    python中的数据类型
    python基础知识
    接口和抽象类的区别
    面向对象的四大特征
    数据结构学习笔记
  • 原文地址:https://www.cnblogs.com/-courage/p/14976411.html
Copyright © 2011-2022 走看看