zoukankan      html  css  js  c++  java
  • 【源码】RingBuffer(一)——生产者

    纯CAS为啥比加锁要快?

    同样是修改数据,一个采用加锁的方式保证原子性,一个采用CAS的方式保证原子性。

    都是能够达到目的的,但是常用的锁(例如显式的Lock和隐式的synchonized),都会把获取不到锁的线程挂起,相对于CAS的不挂起,多了挂起和唤醒的开销。

    题外话:CAS与锁的关系

    CAS只是在这个场景下,比使用锁来得更纯粹,因为只做数据更新,所以开销更少。但是业务上为了保证一系列操作的原子性,还是要使用锁的。而且锁的底层实现,也依赖于类似于CAS这样的原子性操作。

    尾指针是如何管理的,如何防止覆盖旧数据?

    别的帖子都说RingBuffer中不维护尾指针,尾指针由消费者维护(所谓维护指针,就是修改、移动指针)其实这一句话有点误导性,如果RingBuffer不知道尾部在哪里,那它的数据存储肯定就会出问题,例如把还没消费过的数据给覆盖了。

    确实,消费者会自行维护自己的消费指针(消费者指针是消费者消费过的最后一条数据的序号,下一篇中会详细讲到),RingBuffer也不会去干涉消费者指针的维护,但是它会引用所有消费者的指针,读取他们的值,以此作为“尾部”的判断依据。实际上就是最慢的那个消费者作为边界

    我们直接来看代码,这个是RingBuffer的publishEvent方法,我们看到,它首先取得一个可用的序列号,然后再将数据放入该序列号的对应位置中。

    @Override
    public void publishEvent(EventTranslator<E> translator)
    {
        //1.先通过原子操作,得到一个可用的序号
        final long sequence = sequencer.next();
        //2.将该序号对应位置的元素进行转换,接着发布
        translateAndPublish(translator, sequence);
    }

    我们来看看这个序列号是如何取得的。我们先看Sequencer的SingleProducerSequencer实现。这里就是判断如果生产者新指针的位置是否会超过尾部,如果超过尾部就挂起片刻,后续再尝试(生产者的等待方式是固定的,不像消费者有一个等待策略)

    这里附上几个图可能更好理解:(右边是后续补充的用“画图”画的,对单元格添加一些颜色进行区分)

    情况1:队列已满,生产者尝试使用新序号14,但由于(14 - 8 = 6),由于最慢的消费者目前消费的最后一条数据的序号是5,5号之后的数据还没被消费,6 > 5,所以序号14还不能用。生产者线程挂起,下次再次尝试。

     情况2:消费者1消费了序号6的数据。(14 - 8 = 6) 不大于 6,这时序号14可用,生产者得到可用的序号。

        @Override
        public long next()
        {
            return next(1);
        }
    
        /**
         * @see Sequencer#next(int)
         */
        @Override
        public long next(int n)
        {
            if (n < 1 || n > bufferSize)
            {
                throw new IllegalArgumentException("n must be > 0 and < bufferSize");
            }
    
            long nextValue = this.nextValue; //当前RingBuffer的游标,即生产者的位置指针
    
            long nextSequence = nextValue + n; 
            long wrapPoint = nextSequence - bufferSize; //减掉一圈
            long cachedGatingSequence = this.cachedValue; //上一次缓存的最小的消费者指针
    
            //条件1:生产者指针的位置超过当前消费最小的指针
            //条件2:为特殊情况,这里先不考虑,详见:
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
            {
                cursor.setVolatile(nextValue);  // StoreLoad fence
    
                long minSequence;
                //再次遍历所有消费者的指针,确认是否超过
                //如果超过,则等待
                while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
                {
                    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
                }
    
                this.cachedValue = minSequence;
            }
    
            this.nextValue = nextSequence;
    
            return nextSequence;
        }

    另外对于多生产者的情况,在不会越界的情况下,需要通过CAS来保证获取序号的原子性。具体可以查看MultiProducerSequencer的next方法。

     消费者指针是如何读取的?

    RingBuffer如何知道有哪些消费者?哪些gatingSequense是从哪里来的?

    在构建RingBuffer注册处理类的时候,就将消费者Sequense注册到RingBuffer中了。

    看代码的话,定位到gatingSequences在AbastractSequencer,对应的有个addGatingSequenses方法用于注入gatingSequence

    public abstract class AbstractSequencer implements Sequencer {
        //...
        protected volatile Sequence[] gatingSequences = new Sequence[0];
    
        @Override
        public final void addGatingSequences(Sequence... gatingSequences)
        {
            SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
        }
    
        //...
    }

    再查看addGatingSequences被调用的地方,即通过RingBuffer的方法,设置到Sequencer中,这个Sequencer是生产者使用的序号管理器

    public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
        //...
        protected final Sequencer sequencer;
        
        public void addGatingSequences(Sequence... gatingSequences) {
            sequencer.addGatingSequences(gatingSequences);
        }
        //...
    }

    而RingBuffer的addGatingSequence则在Disruptor配置处理器的时候被调用

    public class Disruptor<T> {
        //...
        private final RingBuffer<T> ringBuffer;
        private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
        
        
        public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)
        {
            for (final EventProcessor processor : processors)
            {
                consumerRepository.add(processor);
            }
    
            final Sequence[] sequences = new Sequence[processors.length];
            for (int i = 0; i < processors.length; i++)
            {
                sequences[i] = processors[i].getSequence();
            }
    
            ringBuffer.addGatingSequences(sequences);
    
            return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
        }
        //...
    }

    缓存的意义是什么?

    我们看到在SiingleProducerSequencer的next方法中,会缓存上一次的消费者最小序列号,这有什么用呢?

    用途就是不需要每次都读取各消费者的序号,只要没超过上一次的最小值的地方都可以直接分配,如果超过了,则进行再次判断

    为啥读取最小值不需要保证原子性?

    看了这个获取最小消费序号的,可能会奇怪,为啥这个操作不需要上锁,这个不是会获取到旧值吗?

    确实,这个最小值获取到的时候,实际上数值已经变更。但是由于我们的目的是为了防止指针越位,所以用旧值是没有问题的。(旧值<=实际上的最小值)

    public static long getMinimumSequence(final Sequence[] sequences, long minimum)
        {
            for (int i = 0, n = sequences.length; i < n; i++)
            {
                long value = sequences[i].get();
                minimum = Math.min(minimum, value);
            }
    
            return minimum;
        }
  • 相关阅读:
    Mac上的USB存储设备使用痕迹在新版操作系统有所变化
    Beware of the encrypted VM
    A barrier for Mobile Forensics
    Second Space could let suspect play two different roles easily
    Take advantage of Checkra1n to Jailbreak iDevice for App analysis
    Find out "Who" and "Where"
    Where is the clone one and how to extract it?
    Downgrade extraction on phones running Android 7/8/9
    高版本安卓手机的取证未来
    How to extract WeChat chat messages from a smartphone running Android 7.x or above
  • 原文地址:https://www.cnblogs.com/longfurcat/p/12862602.html
Copyright © 2011-2022 走看看