工作中 有个小模块负责将RocketMQ的数据写入HDFS。本人认为应该有这种轮子,百度后发现了Flume-ng。
Flume的基础知识,官方文档写的很详细:http://flume.apache.org/FlumeUserGuide.html
Flume本身并不支持RocketMQ,好在Github上已经实现:https://github.com/rocketmq/rocketmq-flume
rocketmq-flume项目,同时支持 source和sink,source是从rocketmq读取数据,sink是往rocketmq写数据,我只用到了source
1、flume.conf
agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1
agent1.sources.source1.type=com.handu.flume.source.rocketmq.RocketMQSource agent1.sources.source1.namesrvAddr=10.1.234.205:9876 agent1.sources.source1.consumerGroup=FlumeGroup agent1.sources.source1.topic=FlumeTopic agent1.sources.source1.tags=* agent1.sources.source1.messageModel=BROADCASTING agent1.sources.source1.maxNums=32 agent1.sources.source1.channels=channel1 #agent1.sinks.sink1.type=logger #agent1.sinks.sink1.channel=channel1 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://rti9:9000/flume/ agent1.sinks.sink1.hdfs.writeFormat = Text agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.rollInterval = 0 agent1.sinks.sink1.hdfs.rollSize = 1024 agent1.sinks.sink1.hdfs.rollCount = 0 agent1.sinks.sink1.hdfs.batchSize = 1000 agent1.sinks.sink1.hdfs.txnEventMax = 1000 agent1.sinks.sink1.hdfs.callTimeout = 60000 agent1.sinks.sink1.hdfs.appendTimeout = 60000 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=1001 agent1.channels.channel1.transactionCapacity=1001 agent1.channels.channel1.keep-alive=3
2、参数说明:
hdfs.rollInterval:Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize:File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount:Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.batchSize:number of events written to file before it is flushed to HDFS
batchSize一定不能大于transactionCapacity
3、启动 agent:
flume-ng agent -c conf -f conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console
-c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效
4、遇到的问题:
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: channel1}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at com.handu.flume.source.rocketmq.RocketMQSource.process(RocketMQSource.java:106)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 3 more
需要修改JVM内存
vi bin/flume-ng
JAVA_OPTS="-Xmx2048m"