zoukankan      html  css  js  c++  java
  • DelayedOperationPurgatory之DelayedOperation pool

    purgatory就是炼狱的意思。

    当一个DelayedOperation需要被delay时,它就被放到DelayedOperationPurgatory,相当于进行一个等待池。上一篇blog提到过,DelayedOperation想要摆脱delay状态,需要由事件来触发对它状态的检查,或者是超时时间到了。

    这个逻辑看起来挺简单,但高效实现却挺复杂。0.9.0版本的Kafka重新设计了这个purgatory(相比0.8版本),设计思路在Kafka的这篇文档Purgatory Redesign Proposa。我翻译并注释了一下,放在Kafka之Purgatory Redesign Proposal (翻译)

     purgatory的实现使用了两个缓存,这里先讲第一个。

    直观的,我们需要一个key绑定到DelayedOperation上,来说明这个DelayedOperation会由哪些事件触发,而且一个DelayedOperation可以绑定到多个key, 一个key也可能跟多个DelayedOperation有关。所以这是一个多对多的映射,一边是事件,一边是DelayedOperation。

    DelayedOperation pool

    DelayedOperationPurgatory就提供了这种功能。它把这种映射保存到一个Pool里。这个Pool实际上是对一个ConcurrentHashMap的封装.

     private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))

    Pool构造器的参数是一个value factory,即当这个pool里一个key没有value时,就用这个函数生成一个value。所以,对于value为空的key,这个pool就会构造一个watch这个key的Watchers.

    Watchers

    这个Pool的key是Any类型,也就是Scala里所有对象的基类,value是Watchers。注释里这么描述Watchers的

    A linked list of watched delayed operations based on some key

    Watchers把这些DelayedOperation保存在自己的一个instance field里

     private[this] val operations = new LinkedList[T]()

    由于Watchers是DelayedOperationPurgatory的内部类,T就源于DelayedOperationPurgatory的签名

    class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
            extends Logging with KafkaMetricsGroup 

    所以operations这个域里保存的就是一个DelayedOperation的列表。

    所以,DelayedOperationPurgatory用来保存DelayedOperation的数据结构是一个Map[Any, List[DelayedOpertion]]的结构。

    对于Watchers来说,有两件事要做

    • 当它关注的key有事件发生时,需要调用它的方法来遍历operations,找出其中可以被complete(即不再被delay)的operation。这个方法就是tryCompleteWatched
    • 由于一个DelayedOperation可以对应多个key,所以当这个Watchers对应的key没有被触发,它保存的operations里的元素仍然可能由于其它的key触发而而被complete。所以外界需要能主动地检测这个Watchers里的哪些operation已经被complete了,并且移除这些元素。这个方法就是purgeCompleted

    做这两件事情的实机很重要,下面来分析一下

    1. 第一件事,在产生事件的地方进行检测就好。比如fetch线程处理fetch请求的过程,以及produce request的处理过程中,可以调用tryCompleteWatched。
    2. 第二件事的处理实机比较不好确定。因为当把一个request从某个key的watchers中移除以后,它可能还在另一个key的watchers里。而每次移除一个request,都要调用purgeCompleted显然不现实。但是0.9.0的实现中引用了新的数据结构来对request的超时进行检测,通过它可以准确获得某个时刻在purgatory中的请求数量(但并不是server中的DelayedOperation的数量,因为超时的DelayedOperation会被放入一个线程池执行它的回调,所以总的数量还需要加上线程池中的Operation数量, 而且这个线程池是一个FixedThreadPool,它使用一个无界的queue)。具体的作法请参照Kafka之Purgatory Redesign Proposal (翻译)
  • 相关阅读:
    网友心得 说说.NET中的反射(转帖)
    javascript的函数(转)
    asp.net基于窗体的身份验证
    创建ASP.NET WEB自定义控件(转)
    .net调用Oracle存储过程
    写字间里程序员
    世界四大杀毒软件调侃
    技巧/诀窍:在ASP.NET中重写URL(转)
    VS2008中JavaScript编辑调试器的秘密
    如何用C#语言构造蜘蛛程序
  • 原文地址:https://www.cnblogs.com/devos/p/5055656.html
Copyright © 2011-2022 走看看