zoukankan      html  css  js  c++  java
  • flume坑之channel.transactionCapacity和HdfsSink.batchSize

      不说过程了,直接说结果!一对相连接的channel-HdfsSink,无意间配置如下:
    ...
    agent.channels.common-channel.transactionCapacity=10
    ...
    agent.sinks.hdfs-sink.hdfs.batchSize=20

      简单测试之后发现flume报如下异常,倒也正常……

    [2015-12-17 11:42:09:694 ERROR][org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:467)]process failed
    org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or in creasing thread count
            at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
            at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
            at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
            at java.lang.Thread.run(Thread.java:745)
    [2015-12-17 11:42:09:696 ERROR][org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]Unable to deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
            at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
            at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
            at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
            ... 3 more

      但是......但是老子发现停掉发送,channelSize一直不减,hdfs里的数据也一直在涨!!!而且永远停不下来,数据被永远重放!!!
      查看相关HdfsSink代码如下:

      public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        List<BucketWriter> writers = Lists.newArrayList();
        transaction.begin();
        try {
          int txnEventCount = 0;
          for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
            Event event = channel.take();
            if (event == null) {
              break;
            }
            ......
            
              bucketWriter.append(event);
            ......
    
          transaction.commit();
    
          if (txnEventCount < 1) {
            return Status.BACKOFF;
          } else {
            sinkCounter.addToEventDrainSuccessCount(txnEventCount);
            return Status.READY;
          }
        } catch (IOException eIO) {
          transaction.rollback();
          LOG.warn("HDFS IO error", eIO);
          return Status.BACKOFF;
        } catch (Throwable th) {
          transaction.rollback();
          LOG.error("process failed", th);
          if (th instanceof Error) {
            throw (Error) th;
          } else {
            throw new EventDeliveryException(th);
          }
        } finally {
          transaction.close();
        }
      }

      看到了嘛!异常在取第11个数据时channel.take();出现,然后此次事物被回滚,但是,但是尼玛之前取出来的10个Event都被bucketWriter.append(event);了,也就是被写到hdfs了;
    然后就是最初的现象了,事物一直不断的被回滚,但部分取到的数据却也写到hdfs了,这尼玛算是什么事务……
      虽然是不合理的配置参数,但flume启动时有一大陀检测参数的代码也没检测到这些,至少给报个错或WARN嘛!

  • 相关阅读:
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 212 单词搜索 II
    Java实现 LeetCode 212 单词搜索 II
    Java实现 LeetCode 212 单词搜索 II
    Java实现 LeetCode 212 单词搜索 II
    Java实现 LeetCode 344 反转字符串
    Java实现 洛谷 P1208 [USACO1.3]混合牛奶 Mixing Milk
    Java实现 洛谷 P1208 [USACO1.3]混合牛奶 Mixing Milk
    Java实现 洛谷 P1208 [USACO1.3]混合牛奶 Mixing Milk
    Java实现 洛谷 P1208 [USACO1.3]混合牛奶 Mixing Milk
  • 原文地址:https://www.cnblogs.com/logicbaby/p/5053685.html
Copyright © 2011-2022 走看看