zoukankan      html  css  js  c++  java
  • 第四部分-并发编程案例分析3:高性能队列Disruptor

    1.为什么要用高性能队列Disruptor

    为什么要说Disruptor?java SDK提供了2个有界队列
    ArrayBlockQueue,LinkedBlockingQueue,基于ReentrantLock锁实现,在高并发情况下,锁的效率不高,更好的替代品有木有?Dosritpr

    2.Disruptor介绍

    性能更高的有界队列

    Log4j2,Spring Messageing,HBase,Storm都用了Disruptor

    为什么性能高?
    1.内存分配连续,使用RingBuffer数据结构,数组元素初始化时一次性创建,缓存命中率更高;对象循环引用,避免频繁GC
    2.避免伪共享,提升缓存利用率。
    3.无锁算法,避免频繁加锁,解锁的性能消耗。
    4.支持批量消费,消费者可以无锁消费多个消息

    3.Disruptor 使用

    • 生产者生产的对象称为Event,使用Disruptor必须定义Event,实例代码中为LongEvent
    • 构建Disruptor要指定度列大小(bufferSize),还需要传入EventFactory,示例代码为LongEvent::new
    • 消费Event需要通过handleEventsWith()方法注册一个事件处理器,发布Event则需要通过publishEvent()放阿飞

    伪代码

    
    //自定义Event
    class LongEvent {
      private long value;
      public void set(long value) {
        this.value = value;
      }
    }
    //指定RingBuffer大小,
    //必须是2的N次方
    int bufferSize = 1024;
    
    //构建Disruptor
    Disruptor<LongEvent> disruptor 
      = new Disruptor<>(
        LongEvent::new,
        bufferSize,
        DaemonThreadFactory.INSTANCE);
    
    //注册事件处理器
    disruptor.handleEventsWith(
      (event, sequence, endOfBatch) ->
        System.out.println("E: "+event));
    
    //启动Disruptor
    disruptor.start();
    
    //获取RingBuffer
    RingBuffer<LongEvent> ringBuffer 
      = disruptor.getRingBuffer();
    //生产Event
    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long l = 0; true; l++){
      bb.putLong(0, l);
      //生产者生产消息
      ringBuffer.publishEvent(
        (event, sequence, buffer) -> 
          event.set(buffer.getLong(0)), bb);
      Thread.sleep(1000);
    }
    

    测试代码

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = FamaMonitorApplication.class)
    public class DisruptorTest {
    
        class MessageEvent {
            public String message;
        }
    
        private static final int BUFFER_SIZE = 1024;
    
        @Test
        public void test(){
            // 构建disruptor
            Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(
                    MessageEvent::new,
                    BUFFER_SIZE,
                    DaemonThreadFactory.INSTANCE
            );
            // 注册消费者,并编写消费逻辑
            disruptor.handleEventsWith(((event, sequence, endOfBatch) -> {
                Thread.sleep(1000);
                System.out.println("Message : " + event.message);
            }));
    
            // 启动
            disruptor.start();
    
            // 获取队列,然后生产
            RingBuffer<MessageEvent> ringBuffer = disruptor.getRingBuffer();
            for (int i = 0; true; i++) {
                int finalI = i;
                ringBuffer.publishEvent(((event, sequence, args) -> {
                    event.message = finalI + "";
                }));
                Thread.yield();
            }
    
        }
    }
    

    4.Disruptor 提升性能的秘籍1-内存连续分配

    ArrayBlockQueue底层使用数组+ReentrantLock锁实现。
    RingBuffer本质也是数组,这一点性能没有什么区别,区别在于Disruptor对于RingBuffer基础上做了很多优化,其中重要一项就是内存连续分配

    5.程序的局部性原理

    一段时间内程序的执行会限定在一个局部范围内

    时间局限性:程序的某条指令一旦被执行,不久之后,这条指令可能再次被执行,某条数据被访问,不就之后这条数据可能再次被访问。
    空间局限期:某块内存一旦被访问,不久之后这块内存附近的内存也有可能被访问。

    6.cpu寄存器,cpu缓存

    cpu缓存说的就是cpu的寄存器,cpu从内存中加载数据x时,会将x缓存在寄存器里,实际上cpu缓存x的同时,还缓存了x周围的数据,(根据程序的空间局限期,周围数据也有可能被访问)。程序能够体现局部性原理,就可以更好的利用cpu寄存器,从而提升性能。
    cpu寄存器的读取速度和内存读取速度不是一个数量级。

    7.ArrayBlockQueue有没有利用局部性原理,提升cpu缓存命中率?

    并没有
    每次队列中添加元素,创建对象E,这个对象都是由生产者线程创建的,由于创建这些元素的时间是离散不连续的,这些元素的内存地址大概率不连续

    image

    8.Disruptor利用空间局限期原理,提升了cpu缓存命中率

    RingBuffer内部也是数组,但数组中的元素都是在初始化时一次性创建的,这些元素的内存地址大概率是连续的

    
    for (int i=0; i<bufferSize; i++){
      //entries[]就是RingBuffer内部的数组
      //eventFactory就是前面示例代码中传入的LongEvent::new
      entries[BUFFER_PAD + i] 
        = eventFactory.newInstance();
    }
    

    9.数组中的元素内存地址连续就能提升性能么?

    对,可以
    消费线程在消费队列时,遵循空间局限性原则,消费完第一个元素,很快就会消费第二个元素。消费第一个元素E1的时候,cpu会把内存E1后面的数据也加载进cpu寄存器里。如果第二个元素E2和E1的内存地址连续的。E2也会被加进Cache中。消费E2的时候,E2已经在cpu寄存器中了,不需要从内存再加载一遍,大大提升了性能了就
    image

    10.生产者生产时,不是往队列中add,而是修改队列中的元素对象

    publishEvent()发布Event时,并不创建新Event,而是event.set()修改event
    RingBuffer创建的event是可以循环利用的。这样还能避免频繁创建,删除Event导致频繁GC问题

    11.避免伪共享,避免缓存不可用也同样重要

    伪共享的存在,使得cache失效

    cpu内存的缓存是按照缓存行(Cache Line)管理的,缓存行的大小通常是64字节;cpu从内存加载数据x,会同时加载x后面的(64-size(x))的数据。

    Ar'rayBlockQueue的内部结构

    
    /** 队列数组 */
    final Object[] items;
    /** 出队索引 */
    int takeIndex;
    /** 入队索引 */
    int putIndex;
    /** 队列中元素总数 */
    int count;
    

    cpu从内存加载takeIndex时,会连带将putIndex和count都加载进cache。
    image

    线程A运行在cpu1核上,入队操作,入队会修改putIndex,修改putIndex会导致所有核的缓存均实效,此时线程B在cpu2核上执行出队,需要获取takIndex,takeIndex缓存已实效,所以必须重新从内存里再拉取一遍,性能就下来了。。

    12.taskIndex缓存为什么会失效?

    入队操作不会修改takeIndex,但是takeIndex和putIndex共享一个缓存行,导致出队操作利用不了缓存,这就是伪共享带来的问题,性能并没有带来替身。即使数组中的对象内存地址是连续的。

    13.如何避免伪共享带来的缓存不可用情况出现?

    每个变量独占一个缓存行,不共享缓存行就可以
    putIndex是一个缓存行
    takeIndex是另一个缓存行

    如何保证putIndex是一个缓存行呢?
    在taskIndex的前后各填充56个字节,这样就能保证takeIndex独占一个缓存行

    
    //前:填充56字节
    class LhsPadding{
        long p1, p2, p3, p4, p5, p6, p7;
    }
    class Value extends LhsPadding{
        volatile long value;
    }
    //后:填充56字节
    class RhsPadding extends Value{
        long p9, p10, p11, p12, p13, p14, p15;
    }
    class Sequence extends RhsPadding{
      //省略实现
    }
    

    14.综上所述

    数组中连续的内存地址对象+排除伪共享情况,
    就可以充分的利用cpu的缓存提升性能
    以上代码设计上面的改动,说白了就是更极端的压榨硬件cpu缓存的性能

    15.无锁算法

    ArrayBlockQueue中使用了ReentrantLock锁,高并发情况下,性能不高
    而Disruptor中采用无锁算法,没有加锁,解锁的性能消耗影响

    看上去有点复杂,目前没看太明白,总之性能要优于使用ReentrantLock锁
    基本逻辑是:如果没有足够的空余位置,就出让 CPU 使用权,然后重新计算;反之则用 CAS 设置入队索引

    
    //生产者获取n个写入位置
    do {
      //cursor类似于入队索引,指的是上次生产到这里
      current = cursor.get();
      //目标是在生产n个
      next = current + n;
      //减掉一个循环
      long wrapPoint = next - bufferSize;
      //获取上一次的最小消费位置
      long cachedGatingSequence = gatingSequenceCache.get();
      //没有足够的空余位置
      if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
        //重新计算所有消费者里面的最小值位置
        long gatingSequence = Util.getMinimumSequence(
            gatingSequences, current);
        //仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
        if (wrapPoint > gatingSequence){
          LockSupport.parkNanos(1);
          continue;
        }
        //从新设置上一次的最小消费位置
        gatingSequenceCache.set(gatingSequence);
      } else if (cursor.compareAndSet(current, next)){
        //获取写入位置成功,跳出循环
        break;
      }
    } while (true);
    

    16.jvm对于伪代码共享的支持

    @sun.misc.Contended
    注解可以轻松避免伪共享(前提是jvm参数-XX:-RestrictContended),以牺牲内存为代价。

    原创:做时间的朋友
  • 相关阅读:
    navcat15 安装+激活
    页面调用百度地图但是使用了https证书之后不显示
    net core webapi 数据库连接
    asp.net core webAPI跨域问题
    本机端口(出入站)配置
    Vue页面跳转路由
    net core --- Swagger搭建(net core 3.1 版本)
    sqlserver数据库中生成随机数
    随机生成登录验证码(4位)
    定时器
  • 原文地址:https://www.cnblogs.com/PythonOrg/p/14899395.html
Copyright © 2011-2022 走看看