zoukankan      html  css  js  c++  java
  • flume MemoryChannel 源代码解析

    1.先分析三个LinkedBlockingDeque<Event>类型的takeList,putList,queue

    putList:  存放的是来自source生产的数据,通过调用doPut(Event event)方法,它是怎样到queue的,在每次运行doCommit的时候,会循环放到queue,事实上doCommit()放法仅仅做了putlist交给queue,

          synchronized(queueLock) {
            if(puts > 0 ) {
              while(!putList.isEmpty()) {
                if(!queue.offer(putList.removeFirst())) {
                  throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
                }
              }
            }
            putList.clear();
            takeList.clear();
          }

    takelist:    每次sink消费。都会加到takelist,一般不起什么作用,可是操作失败。rollback就起作用了

    protected void doRollback() {

         int takes = takeList.size();
          synchronized(queueLock) {
            Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
                "queue to rollback takes. This should never happen, please report");
            while(!takeList.isEmpty()) {
              queue.addFirst(takeList.removeLast());
            }
            putList.clear();
          }
          bytesRemaining.release(putByteCounter);
          putByteCounter = 0;
          takeByteCounter = 0;
          queueStored.release(takes);
          channelCounter.setChannelSize(queue.size());
        }
      }

    queue:存放的即将传递给sink的全部数据.


    2.參数相应关系

    transactionCapacity:设置takelist,putlist最大容量

    capacity:           设置queue的最大容量

    keep-alive:          Semaphore的tryAcquire的timeout的參数,添加到takelist,putlist,queue超时时间

  • 相关阅读:
    SQL中的全局变量和局部变量(@@/@)
    C# 委托Delegate(一) 基础介绍&用法
    internal in C#
    用代码块在new对象时set属性
    MySql与对应的Java的时间类型
    快速获取当天0点0分0秒(00:00:00)
    IsNullOrWhiteSpace与IsNullOrEmpty
    svn服务器配置 for mac
    CornerStone配置SVN,HTTP及SVN简单使用说明
    svn配置
  • 原文地址:https://www.cnblogs.com/mqxnongmin/p/10571106.html
Copyright © 2011-2022 走看看