zoukankan      html  css  js  c++  java
  • Flume-ng:从RocketMQ到HDFS同步数据

    工作中 有个小模块负责将RocketMQ的数据写入HDFS。本人认为应该有这种轮子,百度后发现了Flume-ng。

    Flume的基础知识,官方文档写的很详细:http://flume.apache.org/FlumeUserGuide.html

    Flume本身并不支持RocketMQ,好在Github上已经实现:https://github.com/rocketmq/rocketmq-flume

    rocketmq-flume项目,同时支持 source和sink,source是从rocketmq读取数据,sink是往rocketmq写数据,我只用到了source

    1、flume.conf 


    agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1
    agent1.sources.source1.type
    =com.handu.flume.source.rocketmq.RocketMQSource agent1.sources.source1.namesrvAddr=10.1.234.205:9876 agent1.sources.source1.consumerGroup=FlumeGroup agent1.sources.source1.topic=FlumeTopic agent1.sources.source1.tags=* agent1.sources.source1.messageModel=BROADCASTING agent1.sources.source1.maxNums=32 agent1.sources.source1.channels=channel1 #agent1.sinks.sink1.type=logger #agent1.sinks.sink1.channel=channel1 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://rti9:9000/flume/ agent1.sinks.sink1.hdfs.writeFormat = Text agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.rollInterval = 0 agent1.sinks.sink1.hdfs.rollSize = 1024 agent1.sinks.sink1.hdfs.rollCount = 0 agent1.sinks.sink1.hdfs.batchSize = 1000 agent1.sinks.sink1.hdfs.txnEventMax = 1000 agent1.sinks.sink1.hdfs.callTimeout = 60000 agent1.sinks.sink1.hdfs.appendTimeout = 60000 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=1001 agent1.channels.channel1.transactionCapacity=1001 agent1.channels.channel1.keep-alive=3

    2、参数说明:

    hdfs.rollInterval:Number of seconds to wait before rolling current file (0 = never roll based on time interval)

    hdfs.rollSize:File size to trigger roll, in bytes (0: never roll based on file size)

    hdfs.rollCount:Number of events written to file before it rolled (0 = never roll based on number of events)

    hdfs.batchSize:number of events written to file before it is flushed to HDFS

    batchSize一定不能大于transactionCapacity

    3、启动 agent:

    flume-ng agent -c conf -f conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console

    -c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效

    4、遇到的问题:

    org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: channel1}
      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
      at com.handu.flume.source.rocketmq.RocketMQSource.process(RocketMQSource.java:106)
      at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
      at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
      at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
      at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

      ... 3 more

    需要修改JVM内存

    vi bin/flume-ng
    JAVA_OPTS="-Xmx2048m"

  • 相关阅读:
    EntityFramework之创建数据库及基本操作(一)
    Entity Framework Code First (八)迁移 Migrations
    [转]Tomcat启动报错:AnnotationConfigBeanDefinitionParser are only available on JDK 1.5 and higher
    [转]bootstrap table本地数据使用方法
    [转]MySQL-5.7 Update语句详解
    [转]操作MySQL数据库报出:Parameter index out of range (1 > number of parameters, which is
    [转]在MySQL中创建实现自增的序列(Sequence)的教程
    [转]MySQL如何设置自动增长序列 SEQUENCE
    [转]Mustache 使用心得总结
    [转]关于重定向RedirectAttributes的用法
  • 原文地址:https://www.cnblogs.com/machong/p/5630373.html
Copyright © 2011-2022 走看看