zoukankan      html  css  js  c++  java
  • Flume-ng:从RocketMQ到HDFS同步数据

    工作中 有个小模块负责将RocketMQ的数据写入HDFS。本人认为应该有这种轮子,百度后发现了Flume-ng。

    Flume的基础知识,官方文档写的很详细:http://flume.apache.org/FlumeUserGuide.html

    Flume本身并不支持RocketMQ,好在Github上已经实现:https://github.com/rocketmq/rocketmq-flume

    rocketmq-flume项目,同时支持 source和sink,source是从rocketmq读取数据,sink是往rocketmq写数据,我只用到了source

    1、flume.conf 


    agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1
    agent1.sources.source1.type
    =com.handu.flume.source.rocketmq.RocketMQSource agent1.sources.source1.namesrvAddr=10.1.234.205:9876 agent1.sources.source1.consumerGroup=FlumeGroup agent1.sources.source1.topic=FlumeTopic agent1.sources.source1.tags=* agent1.sources.source1.messageModel=BROADCASTING agent1.sources.source1.maxNums=32 agent1.sources.source1.channels=channel1 #agent1.sinks.sink1.type=logger #agent1.sinks.sink1.channel=channel1 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://rti9:9000/flume/ agent1.sinks.sink1.hdfs.writeFormat = Text agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.rollInterval = 0 agent1.sinks.sink1.hdfs.rollSize = 1024 agent1.sinks.sink1.hdfs.rollCount = 0 agent1.sinks.sink1.hdfs.batchSize = 1000 agent1.sinks.sink1.hdfs.txnEventMax = 1000 agent1.sinks.sink1.hdfs.callTimeout = 60000 agent1.sinks.sink1.hdfs.appendTimeout = 60000 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=1001 agent1.channels.channel1.transactionCapacity=1001 agent1.channels.channel1.keep-alive=3

    2、参数说明:

    hdfs.rollInterval:Number of seconds to wait before rolling current file (0 = never roll based on time interval)

    hdfs.rollSize:File size to trigger roll, in bytes (0: never roll based on file size)

    hdfs.rollCount:Number of events written to file before it rolled (0 = never roll based on number of events)

    hdfs.batchSize:number of events written to file before it is flushed to HDFS

    batchSize一定不能大于transactionCapacity

    3、启动 agent:

    flume-ng agent -c conf -f conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console

    -c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效

    4、遇到的问题:

    org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: channel1}
      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
      at com.handu.flume.source.rocketmq.RocketMQSource.process(RocketMQSource.java:106)
      at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
      at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
      at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
      at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

      ... 3 more

    需要修改JVM内存

    vi bin/flume-ng
    JAVA_OPTS="-Xmx2048m"

  • 相关阅读:
    MySQL日志系统
    MySQL基础架构
    Java操作XML牛逼利器JDOM&DOM4J
    SAX方式解析XML
    DOM方式解析XML
    Jquery Ajax
    Jquery动画效果
    angular6新建项目
    mysql命令行使用
    git常用命令
  • 原文地址:https://www.cnblogs.com/machong/p/5630373.html
Copyright © 2011-2022 走看看