一、disruptor 简介:
高效无锁内存队列,使用无锁方式实现一个环形队列 代替 线性队列。
相对于普通队列,环形队列不需要维护头尾两个指针,只需维护一个当前位置就可以完成入队操作。
环形队列大小不能扩展。整个业务逻辑处理器完全运行在内存中,架构单线程可处理每秒600W流水。非常适合哪种实时性高、延迟率低、业务流水量大的应用场景。
二、Disruptor 框架核心类
Disruptor : 用于控制整个消费者 - 生产者模型的处理器
RingBuffer : 用于存放数据 (环形缓冲区)
EventHandler : 一个用于处理事件的接口(可当生产者,也可当消费者)
EventFactory : 事件工厂类
WaitStrategy : 用于实现事件处理等待 RingBuffer 游标策略的接口
SequeueBarrier :队列屏障,用于处理访问 RingBuffer 的序列
用于运行Disruptor 的线程或者线程池
三、Disruptor 编写步骤
定义事件
定义事件工厂
消费者 - 定义事件处理的具体实现
定义用于事件处理(消费者)的线程池
指定等待策略
BlockingWaitStrategy 最低效策略
SleepingWaitStrategy 适用于异步日志场景
YieldingWaitStrategy 性能最好,适用于低延迟系统
生产(发布)消息
关闭 disruptor 业务逻辑处理器
四、ringbuffer 环形缓冲区 (ring : 环,戒指。 buffer :缓冲区)
没有尾指针,只维护一个指向下一个可用位置的序号。不删除buffer中的数据,知道新数据覆盖他们。
RingBuffer 优点: 它是数组,比链表块,而且有容易预测的访问模式(数组内元素内存地址是连续存储的)。另外可以为数组预先分配内存,数组对象一直存在,就不用花大量时间用于垃圾回收。(不像链表要为每一个添加到上面的对象创造节点,删除节点时还要执行内存清理)。
ringbuffer 是由一个大数组组成的
所有 ringbuffer 的 “指针(也称为序列或游标)”,是Java long类型的(64 位有符号数),指针采用往上计数自增的方式。(别担心溢出,100W/s,30W年 才溢出 )
对 RingBuffer 中的指针进行按 ringbuffer 的 size 取模 找出数组下标来定位入口(类似于HashMap的entry )。通常将 RingBuffer 的size 设置成实际使用的 2 倍,可通过位运算(bit-mask)方式计算出数组下标。
写入数据先后顺序由线程抢占位置决定,不是由提交先后决定。
五、从 RingBuffer 读取数据
原文学习地址:http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/
ConsumerBarrier ( 消费屏障 ) 与消费者
ConsumerBarrier : 一个由 RingBuffer 创建 并且代表消费者与 RingBuffer 进行交互的对象。
消费者调用 ConsumerBarrier 对象的 waitFor() 方法,传递它所需要的下一个序号:
代码 : final long availableSeq = consumerBarrier.waitFor(nextSequence) ;
ConsumerBarrier 会返回 RingBuffer 的最大可访问序号。
【ConsumerBarrier 有一个 WaitStrategy 方法来决定等待序号 - 最大访问序号】
六、写入 RingBuffer
原文学习地址:http://ifeve.com/disruptor-writing-ringbuffer/
RingBuffer 提供了 ProducterBuffer(生产整流器、生产者壁垒) 对象。让生产者通过它写入 RingBuffer。
写入过程涉及 两阶段提交(two-phase commit):
生产者需要申请 buffer 里下一个节点
生产者向节点写完数据,会调用 ProducerBarrier 的 commit 方法。
第一步 - 申请下个节点,就是简单调用 ProducerBarrier 中的 nextEntry() 方法,方法返回的 Entry 对象就是下一个节点。这里 ProducerBarrier 需要 防止 RingBuffer 重叠 。
ConsumerTrackingProducerBarrier 对象拥有所有正在访问 RingBuffer 的消费者列表。Disruptor中由消费者通知它处理到了那个序列号。
当 生产者申请的环形节点的下一个节点被其他消费者占用时,ProducerBuffer 停下 自旋(spins),等待,直到那个消费者离开。消费者离开后,ProducerBarrier 会发现该节点可用,抢占该节点的 Entry ,并更新节点 Entry 上的序号,再把 Entry 返回给生产者。生产者往 Entry 里写数据。
第二部 - 提交 。生产者往 Entry 写入数据后,会要求 ProducerBarrier 把数据提交到 RingBuffer。
单生产者时:
ProducerBarrier 先等待 RingBuffer 的游标追上当前位置后,更新 RingBuffer 游标到刚写入的Entry序号 并提交Entry。
然后 ProducerBarrier 调用 ConsumerBarrier 上的 WaitStrategy 对象,让消费者直到 buffer 中有新东西了。消费者就可以读取最新的 Entry 内容。
ProducerBarrier 的批处理,ProducerBarrier 知道 RingBuffer的大小,知道最慢消费者的位置,也知道当前那些节点可用。在最慢消费者的节点前,不需要再检查消费者位置,生产者就可以连续写入节点。
多个生产者时:
ProducerBarrier 拿到的下一个序号是生产者通过 ClaimStrategy 申请获取的。
多个生产者向ClaimStrategy 申请可用节点,如果一个生产者在写入 RingBuffer 时候暂停了,只有当它解除暂停,并写入RingBuffer,RingBuffer 游标移动到该节点 后,后面节点等待的提交才会立即执行。
七、Disruptor 依赖关系
原文学习地址:http://ifeve.com/dissecting-disruptor-wiring-up-cn/
用 Disruptor 实现菱形结构,
菱形结构创建代码:
ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier(); BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1); BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2); ConsumerBarrier consumerBarrier2 = ringBuffer.createConsumerBarrier(consumer1, consumer2); BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3); ProducerBarrier producerBarrier = ringBuffer.createProducerBarrier(consumer3);
RingBuffer 可以有多个 ConsumerBarrier。
节点(Entry)对象的每一个字段应该只允许一个消费者写入,避免产生并发写入冲突(write-contention)减慢整个处理过程。 实际中:有个例子 FizzBuzzEntry 有两个字段:fizz 和 buzz 。如果消费者是 Fizz Consumer ,只写入字段 fizz,如果是 Buzz Consumer 只写入字段 buzz ,第三个消费者 FizzBuzz 只读取这两个字段。【用于 生产者插入 Entry 后,消费者1获取到Entry 数据,对数据进行操作后传递给消费者2 再次对Entry 的数据进行处理的情景。最终 消费者1、消费者2修改后的Entry 被消费者 3读取。】
总结:关联Disruptor 与相互依赖(等待)的多个消费者。关键点:
使用多个ConsumerBarrier 来管理消费者之间的依赖(等待)关系
使用 ProducerBarrier 监视结构图中最后一个消费者
只允许一个消费者更新数据节点(Entry)的一个独立字段。
demo 示例地址:
https://github.com/wgy1109/disruptor
包含单用户和多用户情况。