Disruptor 消息队列
定义
- 内存消息队列
- 用于线程之间的消息传递
- 应用于Apache Storm、Camel、Log4j 2 等知名应用
基于循环队列的“生产者 - 消费者模型”
- 实现一个最简单的“生产者 - 消费者模型”
- 对于生产者和消费者之间操作的同步,并没有用到线程相关的操作。
- 当队列满了之后,生产者就轮训等待;
- 当队列空了之后,消费者就轮训等待。
-
public class Queue { private Long[] data; private int size = 0, head = 0, tail = 0; public Queue(int size) { this.data = new Long[size]; this.size = size; } public boolean add(Long element) { if ((tail + 1) % size == head) return false; data[tail] = element; tail = (tail + 1) % size; return true; } public Long poll() { if (head == tail) return null; long ret = data[head]; head = (head + 1) % size; return ret; } } public class Producer { private Queue queue; public Producer(Queue queue) { this.queue = queue; } public void produce(Long data) throws InterruptedException { while (!queue.add(data)) { Thread.sleep(100); } } } public class Consumer { private Queue queue; public Consumer(Queue queue) { this.queue = queue; } public void comsume() throws InterruptedException { while (true) { Long data = queue.poll(); if (data == null) { Thread.sleep(100); } else { // TODO:...消费数据的业务逻辑... } } } }
基于加锁的并发“生产者 - 消费者模型”
- 上面的“生产者 - 消费者模型”实现代码,是不完善的:
- 多个生产者写入的数据可能会互相覆盖;
- 多个消费者可能会读取重复的数据。
- 最简单的解决方法就是加锁,同一时间只允许一个线程执行 add() 函数。由并行改成了串行。
- 加锁将并行改成串行,必然导致多个生产者同时生产数据的时候,执行效率的下降。
- 继续优化代码,用CAS(compare and swap,比较并交换)操作等减少加锁的粒度。
基于无锁的并发“生产者 - 消费者模型”
- 基本思想:换一种队列和“生产者 - 消费者模型”的实现思路:
- 对于生产者来说,它往队列中添加数据之前,先申请可用空闲存储单元,并且是批量地申请连续的 n 个(n≥1)存储单元。
- 当申请到这组连续的存储单元之后,后续往队列中添加元素,就可以不用加锁了,因为这组存储单元是这个线程独享的。
- 不过,申请存储单元的过程是需要加锁的。
- 对于消费者来说,处理的过程跟生产者是类似的:它先去申请一批连续可读的存储单元(这个申请的过程也是需要加锁的),当申请到这批存储单元之后,后续的读取操作就可以不用加锁了。
- 弊端
- 如果生产者 A 申请到了一组连续的存储单元,假设是下标为 3 到 6 的存储单元,生产者 B 紧跟着申请到了下标是 7 到 9 的存储单元,那在 3 到 6 没有完全写入数据之前,7 到 9 的数据是无法读取的。
-
Disruptor 采用的是 RingBuffer 和 AvailableBuffer 这两个结构来实现上面的功能。
disruptor使用环的数据结构,内存连续,初始化时就申请并设置对象,将原本队列的头尾节点锁的争用转化为cas操作,并利用Java对象填充,解决cache line伪共享问题。