zoukankan      html  css  js  c++  java
  • Disruptor源码分析

    本文将介绍Disruptor的工作机制,并分析Disruptor的主要源码

    基于的版本是3.3.7(发布于2017.09.28)

    水平有限,如有谬误请留言指正

    0. 什么是Disruptor?

    Disruptor是一个开源的并发框架,提供了类似于Java中有界队列的功能,主要用于生产消费者场景。

    与Java原生并发队列不同的是,Disruptor高度优化,在单机上可以轻松跑到千万级别的tps与ns级别的延时

    1. Disruptor的关键想法

    a. 使用环形队列作为底层存储(存储空间连续,可以充分利用cache)

    b. 环形队列中存储的对象都是预先建立好的,减少了频繁创建/释放对象带来的开销

    c. 生产者使用两阶段提交的方式来发布事件(第一阶段是先在环形队列中预占一个空位,第二阶段是向这个空位中写入数据,竞争只发生在第一阶段),并使用CAS操作来解决冲突,而不是使用昂贵的Lock

    d. 用cache line padding(缓冲区填充)的思想来解决false sharing(伪共享)的问题

    e. 使用了Java底层的Unsafe操作

    2. Disruptor的核心组件

    RingBuffer

    环形缓冲区,本质是一个定长Object数组(后续称里面的格子为slot),为了避免伪共享,在这个数组的两端额外填充了若干空位(这也导致访问RingBuffer数据的方式比较崎岖,具体请自行参见源码)

    Sequence

    类似于AtomicLong,用于标记事件id

    所有生产者共用一个Sequence,用于不冲突的将事件放到RingBuffer上

    每个消费者自己维护一个Sequence,用于标记自己当前正在处理的事件的id

    Sequencer

    生产者访问RingBuffer时的控制器,主要实现有两种:SingleProducerSequencer与MultiProducerSequencer,分别用于单生产者和多生产者的场景

    SequenceBarrier

    只有一个实现类为ProcessingSequenceBarrier,用于协调生产者与消费者(如果某个slot中的事件还没有被所有消费者消费完毕,那么这个slot是不能被复用的,需要等待)

    WaitStrategy

    消费者等待下一个可用事件的策略,Disruptor自带了多种WaitStrategy的实现,可以根据场景自行选择。

    3. 生产者发布事件到RingBuffer

    示例代码如下:

            long sequence = ringBuffer.next();  // 第一阶段,获取RingBuffer上下一个可用的slot的序列号,这里可能会有争用
            try {
                Event event = ringBuffer.get(sequence); // 根据序列号直接去RingBuffer上获取对应的slot上存储的事件
                event.setData(data);  // 写入数据
            } finally {
                ringBuffer.publish(sequence); // 第二阶段,将这个事件正式发布到RingBuffer中
            }

    需要重点关注的是next()publish()方法

    next()方法

    RingBuffer的next方法直接调用关联的Sequencer的next方法,Sequencer的实现又分为SingleProducerSequencerMultiProducerSequencer这两种

    先从相对简单的单生产者SingleProducerSequencer看起:

    SingleProducerSequencer.next()
        @Override
        public long next()
        {
            return next(1);
        }
    
        @Override
        public long next(int n)
        {
            if (n < 1)//参数检验
            {
                throw new IllegalArgumentException("n must be > 0");
            }
    
            long nextValue = this.nextValue;//上一次返回的seq
    
            long nextSequence = nextValue + n;//这次应该返回的序列值,这个序列值还未被产生,对应的slot里的元素的seq需要减去RingBuffer的大小
            long wrapPoint = nextSequence - bufferSize;//这个序列值对应的slot上正在存储的元素的seq,这个slot可能已经被消费了,也可能没有
            long cachedGatingSequence = this.cachedValue;//获取消费者未消费的元素的seq最小值,这个值不是实时的
    
            //wrapPoint > cachedGatingSequence,检查将要被放入元素的slot是否已经没有消费者占用了
            //cachedGatingSequence > nextValue,用于来应对seq发生溢出的情况
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
            {
                cursor.setVolatile(nextValue);  // StoreLoad fence,更新RingBuffer的游标,用到了Unsafe方法
    
                long minSequence;
                //Util.getMinimumSequence可以获得所有消费者未消费事件的seq最小值,在比这个值更小的slot里发布元素是安全的
                //如果这个判断成立,说明生产者正在试图将元素放到消费者未消费完毕的slot里,这个操作是不安全的,生产者需要在这里被阻塞
                while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
                {
                    waitStrategy.signalAllWhenBlocking();//激活所有的消费者(避免有的消费者睡死过去了)
                    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 自旋等待
                }
    
                this.cachedValue = minSequence;//更新cachedValue
            }
    
            this.nextValue = nextSequence;
    
            return nextSequence;
        }

    逻辑比较难懂,关键之处如下:

    a. 返回的seq对应的slot必须已经被所有消费者消费完毕

    b. Util.getMinimumSequence会遍历所有消费者使用的Sequence,并获取其最小值,这是一个比较昂贵的操作,所以将其缓存在本地的cachedValue变量中

    c. 如果seq对应的slot还没被消费者消费完毕,说明生产速度快于消费速度,生产者需要原地自旋等待,同时向消费者发送信号,避免消费者睡死过去的情况

    再来看多生产者版本:

        @Override
        public long next(int n)
        {
            if (n < 1)//参数检验
            {
                throw new IllegalArgumentException("n must be > 0");
            }
    
            long current;
            long next;
    
            do
            {
                current = cursor.get();//获取最新返回的seq
                next = current + n;//本次返回的seq
    
                long wrapPoint = next - bufferSize;//本次返回的seq对应的slot里的元素的seq
                long cachedGatingSequence = gatingSequenceCache.get();//有多个生产者,gatingSequenceCache实际上是SingleProducerSequencer里的cachedValue的Atomic版本
    
                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)//检查将要被放入元素的slot是否已经没有消费者占用了
                {
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);//获得所有消费者未消费事件的seq最小值
    
                    if (wrapPoint > gatingSequence)//slot仍被消费者占用,生产者自旋等待
                    {
                        waitStrategy.signalAllWhenBlocking();
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }
    
                    gatingSequenceCache.set(gatingSequence);//更新gatingSequenceCache的值
                }
                else if (cursor.compareAndSet(current, next))//用CAS操作更新cursor的值,如果失败了说明有其他生产者在争用,进入下一轮循环
                {
                    break;
                }
            }
            while (true);
    
            return next;
        }

    与单生产者版本类似,但是当前序列值以及缓存住的安全序列值都使用了原子变量,以解决线程安全问题。可以说是非常精巧了。

    至于publish方法

    SingleProducerSequencer.publish
        @Override
        public void publish(long sequence)
        {
            cursor.set(sequence);//更新RingBuffer的游标
            waitStrategy.signalAllWhenBlocking();//给消费者发送signal信号,具体怎么做与waitStrategy的实现有关
        }
    
    MultiProducerSequencer.publish
        @Override
        public void publish(final long sequence)
        {
            setAvailable(sequence);//这里用了一个额外的availableBuffer数组,来标记RingBuffer的某个slot是否已经被publish成功,后面生产者消费的时候会用到
            waitStrategy.signalAllWhenBlocking();//给消费者发送signal信号,具体怎么做与waitStrategy的实现有关
        }

    单生产者/多生产者之间微妙的区别:

    单生产者publish一个seq,那么这个seq之前所有的seq都被publish了

    多生产者publish一个seq,那么只有这一个seq被publish

    4. 消费者从RingBuffer获取数据

    消费者是通过调用Disruptor的handlerEventsWith方法被添加到系统中的,其调用链如下:

    Disruptor.handleEventsWith()
        public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
        {
            return createEventProcessors(new Sequence[0], handlers);//添加事件处理器,
        }
    
    Disruptor.createEventProcessors()
        EventHandlerGroup<T> createEventProcessors(
            final Sequence[] barrierSequences,//默认情况下,barrierSequences是不存在的
            final EventHandler<? super T>[] eventHandlers)
        {
            checkNotStarted();
    
            final Sequence[] processorSequences = new Sequence[eventHandlers.length];//为每个消费者创建一个Sequence
            final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);//创建一个与当前RingBuffer的生产者有关的Barrier
    
            for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)//遍历消费者
            {
                final EventHandler<? super T> eventHandler = eventHandlers[i];
    
                final BatchEventProcessor<T> batchEventProcessor =
                    new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);//为每个消费者创建一个BatchEventProcessor
    
                if (exceptionHandler != null)
                {
                    batchEventProcessor.setExceptionHandler(exceptionHandler);//异常处理器
                }
    
                consumerRepository.add(batchEventProcessor, eventHandler, barrier);//将EventHandler与BatchEventProcessor关联起来
                processorSequences[i] = batchEventProcessor.getSequence();
            }
    
            updateGatingSequencesForNextInChain(barrierSequences, processorSequences);//将消费者的sequence添加为RingBuffer的gating sequence
    
            return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
        }

    然后调用Disruptor的start方法启动系统,其调用链如下:

    Disruptor.start()
        public RingBuffer<T> start()
        {
            checkOnlyStartedOnce();
            for (final ConsumerInfo consumerInfo : consumerRepository)//遍历注册的所有消费者
            {
                consumerInfo.start(executor);
            }
    
            return ringBuffer;
        }
    
    
    EventProcessorInfo.start()
        @Override
        public void start(final Executor executor)
        {
            executor.execute(eventprocessor);//这里是将之前在createEventProcessors中为消费者注册的BatchEventProcessor放到线程池里运行起来了
        }
    
    
    BatchEventProcessor.run()
        @Override
        public void run()
        {
            if (!running.compareAndSet(false, true))//避免重复运行
            {
                throw new IllegalStateException("Thread is already running");
            }
            sequenceBarrier.clearAlert();
    
            notifyStart();
    
            T event = null;
            long nextSequence = sequence.get() + 1L;
            try
            {
                while (true)//在死循环中处理事件
                {
                    try
                    {
                        final long availableSequence = sequenceBarrier.waitFor(nextSequence);//从RingBuffer中获取一批可以处理的事件的seq,策略由之前设置的waitStrategy决定,返回的seq可能会大于nextSequence(批量,提高效率)
                        if (batchStartAware != null)
                        {
                            batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                        }
    
                        while (nextSequence <= availableSequence)//在循环中消耗返回的availableSequence
                        {
                            event = dataProvider.get(nextSequence);//从RingBuffer中读取数据
                            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//调用用户传入的EventHandler中的onEvent方法来处理事件
                            nextSequence++;
                        }
    
                        sequence.set(availableSequence);//这一批seq消耗完了,更新当前消费者关联的sequence,让生产者可以知道,这里调用的是相对廉价的putOrderedLong方法,因为不需要很高的实时性
                    }
                    catch (final TimeoutException e)
                    {
                        notifyTimeout(sequence.get());
                    }
                    catch (final AlertException ex)
                    {
                        if (!running.get())
                        {
                            break;
                        }
                    }
                    catch (final Throwable ex)
                    {
                        exceptionHandler.handleEventException(ex, nextSequence, event);
                        sequence.set(nextSequence);
                        nextSequence++;
                    }
                }
            }
            finally
            {
                notifyShutdown();
                running.set(false);
            }
        }
    
    
    ProcessingSequenceBarrier.waitFor()
        public long waitFor()(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
        {
            checkAlert();
    
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);//调用消费者的waitStrategy来等待sequence变得可用
    
            if (availableSequence < sequence)
            {
                return availableSequence;
            }
    
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);//从RingBuffer中找到最大的已经被publish事件的slot,寻找策略根据单生产者/多生产者有不同
        }
    
    
    SingleProducerSequencer.getHighestPublishedSequence()//单生产者的情况,简单
        public long getHighestPublishedSequence(long lowerBound, long availableSequence)
        {
            return availableSequence;
        }
    
    MultiProducerSequencer.getHighestPublishedSequence()//多生产者
        public long getHighestPublishedSequence(long lowerBound, long availableSequence)
        {
            for (long sequence = lowerBound; sequence <= availableSequence; sequence++)//从lowerBound开始遍历,在RingBuffer中找到最大的已经被publish事件的slot
            {
                if (!isAvailable(sequence))
                {
                    return sequence - 1;
                }
            }
    
            return availableSequence;
        }
    
        public boolean isAvailable(long sequence)
        {
            int index = calculateIndex(sequence);//在RingBuffer中定位
            int flag = calculateAvailabilityFlag(sequence);//后面的3行是在availableBuffer数组中寻找对应位置的元素是否被标记为available。在MultiProducerSequencer的publish方法中会做这一操作
            long bufferAddress = (index * SCALE) + BASE;
            return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
        }

    这一块的代码也比较难,做个小结:

    a. 每个消费者都与一个BatchEventProcessor关联,BatchEventProcessor初始化的时候会创建一个sequence,这个sequence记录的是消费者已经处理的事件的seq

    b. Disruptor初始化的时候会创建一个SequenceBarrier,这个barrier与生产者有关

    c. 消费者会无限的调用barrier的waitFor方法,以尝试获取最新publish的事件,一旦waitFor方法返回了可用的seq,就在循环中调用消费者的onEvent方法将这些事件消耗掉

    d. 在waitFor方法中,会根据设定的WaitStrategy来在RingBuffer上查找最新publish的事件的seq

    5. WaitStrategy

    WaitStrategy在消费速度快于生产速度,消费者等待生产者publish新的事件时会被用到。

    如果生产速度快于消费速度,生产者等待消费者时,用的是LockSupport.parkNanos(1),类似于自旋等待。

    WaitStrategy非常重要,不同的WaitStrategy会直接影响到响应事件和CPU占用,值得专门开一节来分析

    WaitStrategy是一个接口,其中只含有两个方法:

        //sequence:消费者等待这个sequence关联的事件产生
        //cursor:RingBuffer上,生产者关联的Sequence
        //dependentSequence:默认情况下与cursor相同
        //barrier:与生产者关联的barrier
        long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException, TimeoutException;
    
        void signalAllWhenBlocking();//如果有消费者阻塞等待,将其唤醒

    WaitStrategy有多种实现,我这里只分析两种最有代表性的:BlockingWaitStrategy与BusySpinWaitStrategy

    BlockingWaitWaitStrategy的实现如下:

        private final Lock lock = new ReentrantLock();//锁与关联的条件
        private final Condition processorNotifyCondition = lock.newCondition();
    
        @Override
        public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException
        {
            long availableSequence;
            if (cursorSequence.get() < sequence)//如果生产者还没有生产出足够的事件,那么在锁上等待
            {
                lock.lock();//尝试占有锁
                try
                {
                    while (cursorSequence.get() < sequence)
                    {
                        barrier.checkAlert();
                        processorNotifyCondition.await();//等待
                    }
                }
                finally
                {
                    lock.unlock();//释放锁
                }
            }
    
            while ((availableSequence = dependentSequence.get()) < sequence)
            {
                barrier.checkAlert();
            }
    
            return availableSequence;
        }
    
        @Override
        public void signalAllWhenBlocking()
        {
            lock.lock();
            try
            {
                processorNotifyCondition.signalAll();//唤醒所有在锁上等待的生产者
            }
            finally
            {
                lock.unlock();
            }
        }

    代码很简单,用JDK自带的ReentrantLock与Condition来完成消费者的等待控制,只要消费者拿不到可用的事件,就调用Condition.await方法等待

    好处:CPU占用少

    坏处:在生产的事件足够后,消费者无法在第一时间醒来,需要生产者调用signalAll才行,由于此时消费者线程可能已经被OS切走了,这会带来一定的延时

    BusySpinWaitStrategy的实现如下:

        @Override
        public long waitFor(
            final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
            throws AlertException, InterruptedException
        {
            long availableSequence;
    
            while ((availableSequence = dependentSequence.get()) < sequence)//只要生产者没有生产出足够的事件,消费者就一直自旋等待
            {
                barrier.checkAlert();
            }
    
            return availableSequence;
        }
    
        @Override
        public void signalAllWhenBlocking()//消费者根本不被阻塞,所以无需唤醒
        {
        }

    这个等待策略就更简单了,甚至可以说是丧心病狂,所有消费者都在无限自旋等待(Busy Spin),直到生产者生产了足够的事件为止。

    好处:延时极低(消费者线程一直是热的),生产出来的事件会第一时间被消费掉

    坏处:有多少个消费者,就会吃满多少个Core

    小结一下:

    WaitStrategy实际上是延时与CPU资源占用的权衡

    如果你追求最低的延时(ns级别),那就必须保证消费者一直是是热的,不能被系统调度走,因此你需要BusySpinWaitStrategy

    如果你不需要那么低的延时,那么基于锁的BlockingWaitStrategy可能更加适合你

     6. Disruptor与ArrayBlockingQueue的比较

    ArrayBlockingQueue的源码分析可以参见这篇博客,其核心想法如下:

    a. 内置一个ReentrantLock与两个Condition

    b. 任何对Queue的读写操作均用ReentrantLock加锁 -> 实现了线程安全的语义

    c. 在队列空/满的情况下如果继续取出/插入元素,则利用Condition将工作线程阻塞,在符合条件的时候再将被阻塞的线程唤醒 -> 实现了阻塞队列的语义

    很明显,这种实现在高并发的情况下存在一定的问题:

    a. 在任一时刻,只能有一个读/写线程在工作,其他的线程都被ReentrantLock所阻塞

    b. takeIndex与putIndex这两个被频繁访问的域在内存上距离很近,容易引起伪共享问题

    Disruptor则很好的解决了这些问题,具体请参加本文第一节

    7. 总结

    Disruptor是一个设计非常精巧的框架,为了追求极致性能,做了很多底层优化,值得学习参考

    参考资料

    并发编程网上关于Disruptor的系列文章

    高性能队列——Disruptor

  • 相关阅读:
    Mayan游戏 (codevs 1136)题解
    虫食算 (codevs 1064)题解
    靶形数独 (codevs 1174)题解
    黑白棋游戏 (codevs 2743)题解
    神经网络 (codevs 1088) 题解
    The Rotation Game (POJ 2286) 题解
    倒水问题 (codevs 1226) 题解
    银河英雄传说 (codevs 1540) 题解
    生日蛋糕 (codevs 1710) 题解
    第一章 1.11 高阶函数
  • 原文地址:https://www.cnblogs.com/stevenczp/p/7783977.html
Copyright © 2011-2022 走看看