zoukankan      html  css  js  c++  java
  • 数据结构与算法简记--剖析高性能队列Disruptor背后的数据结构和算法

    Disruptor 消息队列


     定义

    • 内存消息队列
    • 用于线程之间的消息传递
    • 应用于Apache Storm、Camel、Log4j 2 等知名应用

    基于循环队列的“生产者 - 消费者模型”

    • 实现一个最简单的“生产者 - 消费者模型”
    • 对于生产者和消费者之间操作的同步,并没有用到线程相关的操作。
    • 当队列满了之后,生产者就轮训等待;
    • 当队列空了之后,消费者就轮训等待。
    • public class Queue {
        private Long[] data;
        private int size = 0, head = 0, tail = 0;
        public Queue(int size) {
          this.data = new Long[size];
          this.size = size;
        }
      
        public boolean add(Long element) {
          if ((tail + 1) % size == head) return false;
          data[tail] = element;
          tail = (tail + 1) % size;
          return true;
        }
      
        public Long poll() {
          if (head == tail) return null;
          long ret = data[head];
          head = (head + 1) % size;
          return ret;
        }
      }
      
      public class Producer {
        private Queue queue;
        public Producer(Queue queue) {
          this.queue = queue;
        }
      
        public void produce(Long data) throws InterruptedException {
          while (!queue.add(data)) {
            Thread.sleep(100);
          }
        }
      }
      
      public class Consumer {
        private Queue queue;
        public Consumer(Queue queue) {
          this.queue = queue;
        }
      
        public void comsume() throws InterruptedException {
          while (true) {
            Long data = queue.poll();
            if (data == null) {
              Thread.sleep(100);
            } else {
              // TODO:...消费数据的业务逻辑...
            }
          }
        }
      }

    基于加锁的并发“生产者 - 消费者模型”

    • 上面的“生产者 - 消费者模型”实现代码,是不完善的:
      • 多个生产者写入的数据可能会互相覆盖;
      • 多个消费者可能会读取重复的数据。
    • 最简单的解决方法就是加锁,同一时间只允许一个线程执行 add() 函数。由并行改成了串行。
    • 加锁将并行改成串行,必然导致多个生产者同时生产数据的时候,执行效率的下降。
    • 继续优化代码,用CAS(compare and swap,比较并交换)操作等减少加锁的粒度。

    基于无锁的并发“生产者 - 消费者模型”

    • 基本思想:换一种队列和“生产者 - 消费者模型”的实现思路:
      • 对于生产者来说,它往队列中添加数据之前,先申请可用空闲存储单元,并且是批量地申请连续的 n 个(n≥1)存储单元。
      • 当申请到这组连续的存储单元之后,后续往队列中添加元素,就可以不用加锁了,因为这组存储单元是这个线程独享的。
      • 不过,申请存储单元的过程是需要加锁的
      • 对于消费者来说,处理的过程跟生产者是类似的:它先去申请一批连续可读的存储单元(这个申请的过程也是需要加锁的),当申请到这批存储单元之后,后续的读取操作就可以不用加锁了。
    • 弊端
      • 如果生产者 A 申请到了一组连续的存储单元,假设是下标为 3 到 6 的存储单元,生产者 B 紧跟着申请到了下标是 7 到 9 的存储单元,那在 3 到 6 没有完全写入数据之前,7 到 9 的数据是无法读取的。
    • Disruptor 采用的是 RingBuffer 和 AvailableBuffer 这两个结构来实现上面的功能。 

    disruptor使用环的数据结构,内存连续,初始化时就申请并设置对象,将原本队列的头尾节点锁的争用转化为cas操作,并利用Java对象填充,解决cache line伪共享问题

  • 相关阅读:
    Leetcode: 1425
    Leetcode: 1508 Range Sum of Sorted Subarray Sums
    Leetcode: 1353. Maximum Number of Events That Can Be Attended
    Leetcode: 1424. Diagonal Traverse II
    Leetcode: 825. Friends Of Appropriate Ages
    非递归实现二叉树的前序,中序,后序遍历
    TCP协议详解
    Linux常见命令
    C++基础笔记
    指针和引用的区别
  • 原文地址:https://www.cnblogs.com/wod-Y/p/12215309.html
Copyright © 2011-2022 走看看