zoukankan      html  css  js  c++  java
  • Disruptor 创建过程

    1 Disruptor disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, ringBufferSize, exec,
                            ProducerType.MULTI, waitStrategy);

        public Disruptor(final EventFactory<T> eventFactory,
                         final int ringBufferSize,
                         final Executor executor,
                         final ProducerType producerType,
                         final WaitStrategy waitStrategy)
        {
            this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
                 executor);
        }
      
        public static <E> RingBuffer<E> create(ProducerType    producerType,
                                               EventFactory<E> factory,
                                               int             bufferSize,
                                               WaitStrategy    waitStrategy)
        {
            switch (producerType)
            {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
            }
        }
    
    

      

     

     createMultiProducer:

        public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,
                                                            int             bufferSize,
                                                            WaitStrategy    waitStrategy)
        {
            MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);//Sequencer
        //public final class MultiProducerSequencer extends AbstractSequencer
        //public abstract class AbstractSequencer implements Sequencer
    return new RingBuffer<E>(factory, sequencer); }
      
        RingBuffer(EventFactory<E> eventFactory,
                   Sequencer       sequencer)
        {
            this.sequencer    = sequencer;
            this.bufferSize   = sequencer.getBufferSize();
    
            if (bufferSize < 1)
            {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            }
            if (Integer.bitCount(bufferSize) != 1)
            {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            }
    
            this.indexMask = bufferSize - 1;
            this.entries   = new Object[sequencer.getBufferSize()];
            fill(eventFactory);
        }
        
        private void fill(EventFactory<E> eventFactory)
        {
            for (int i = 0; i < entries.length; i++)
            {
                entries[i] = eventFactory.newInstance();
            }
        }
    
    
    
     
    
    
    
     

    2 disruptor.handleEventsWith(eventHandlers);

      

        public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)
        {
            return createEventProcessors(new Sequence[0], handlers);
        }
      EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                                   final EventHandler<T>[] eventHandlers)
        {
            checkNotStarted();
    
            final Sequence[] processorSequences = new Sequence[eventHandlers.length];
            final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);//
    //ProcessingSequenceBarrier ProcessingSequenceBarrier implements SequenceBarrier
    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
    consumerRepository 与 batchEventProcessor eventHandler SequenceBarrier 关联起来
    EventHandlerGroup 与disruptor consumerRepository Sequence 关联起来
    
    
    

      

    3 disruptor.start();

        public RingBuffer<T> start()
        {
            Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
            ringBuffer.addGatingSequences(gatingSequences);
    
            checkOnlyStartedOnce();
            for (ConsumerInfo consumerInfo : consumerRepository)
            {
                consumerInfo.start(executor);//自定义的线程池执行EventProcessorInfo
            }
    
            return ringBuffer;
        }

    class EventProcessorInfo<T> implements ConsumerInfo
        @Override
        public void start(final Executor executor)
        {
            executor.execute(eventprocessor);//batchEventProcessor
        }

    BatchEventProcessor RUN方法
      @Override
        public void run()
        {
            if (!running.compareAndSet(false, true))
            {
                throw new IllegalStateException("Thread is already running");
            }
            sequenceBarrier.clearAlert();

            notifyStart();

            T event = null;
    //Concurrent sequence class used for tracking the progress of
    // * the ring buffer and event processors.  Support a number
     //* of concurrent operations including CAS and order writes
            long nextSequence = sequence.get() + 1L;//使用CAS取得下一个序列号
            try
            {
                while (true)
                {
                    try
                    {
                        final long availableSequence = sequenceBarrier.waitFor(nextSequence);//等待策略取得可用序列号

                        if (nextSequence > availableSequence)//
                        {
                            Thread.yield();
                        }

                        while (nextSequence <= availableSequence)
                        {
                            event = dataProvider.get(nextSequence);//dataProvider 就是ringbuffer
                            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//触发自定义事件
                            nextSequence++;
                        }

                        sequence.set(availableSequence);
                    }
                    catch (final TimeoutException e)
                    {
                        notifyTimeout(sequence.get());
                    }
                    catch (final AlertException ex)
                    {
                        if (!running.get())
                        {
                            break;
                        }
                    }
                    catch (final Throwable ex)
                    {
                        exceptionHandler.handleEventException(ex, nextSequence, event);
                        sequence.set(nextSequence);
                        nextSequence++;
                    }
                }
            }
            finally
            {
                notifyShutdown();
                running.set(false);
            }
        }

     4 介绍 final long availableSequence = sequenceBarrier.waitFor(nextSequence);//等待可用序列号

      内容代码

        @Override
        public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
        {
            checkAlert();
    
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    
            if (availableSequence < sequence)
            {
                return availableSequence;
            }
    
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);
        }

      6种等待策略

      

    //MultiProducerSequencer   Sequencer   
    @Override
    public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1;//实际上实现了循环等待 } } return availableSequence; }

        @Override
        public boolean isAvailable(long sequence)
        {
            int index = calculateIndex(sequence);
            int flag = calculateAvailabilityFlag(sequence);
            long bufferAddress = (index * SCALE) + BASE;
            return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
        }
    
    
    
     

    5 disruptor.publishEvent(eventTranslator, msg);

        public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, A arg)
        {
            ringBuffer.publishEvent(eventTranslator, arg);
        }
        
    RingBuffer.class
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); }

      final long sequence = sequencer.next(); 
    //MultiProducerSequencer
        public long next()
        {
            return next(1);
        }
      public long next(int n)
        {
            if (n < 1)
            {
                throw new IllegalArgumentException("n must be > 0");
            }

            long current;
            long next;

            do
            {
                current = cursor.get();
                next = current + n;

                long wrapPoint = next - bufferSize;
                long cachedGatingSequence = gatingSequenceCache.get();

                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                {
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                    if (wrapPoint > gatingSequence)
                    {
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }

                    gatingSequenceCache.set(gatingSequence);
                }
                else if (cursor.compareAndSet(current, next))
                {
                    break;
                }
            }
            while (true);

            return next;
        }
     
        private void translateAndPublish(EventTranslator<E> translator, long sequence)
        {
            try
            {
                translator.translateTo(get(sequence), sequence);// 取位置 调用自定义类设置值 
            //上面是自定义设置值类    
        // EventTranslatorOneArg<ValueEvent, String> eventTranslator = new EventTranslatorOneArg<ValueEvent, String>() {
         //   @Override
          //  public void translateTo(ValueEvent event, long sequence, String msg) {
          //      event.setValue(msg);
          //  }
        }; }
    finally { sequencer.publish(sequence); } }
    //   MultiProducerSequencer
    public void publish(final long sequence)
        {
            setAvailable(sequence);//设置可用
            waitStrategy.signalAllWhenBlocking();
        }
    //SingleProducerSequencer
        public void publish(long sequence)
        {
            cursor.set(sequence);//cursor就是 Sequence
            waitStrategy.signalAllWhenBlocking();
        }
    
    
    
     
  • 相关阅读:
    亚马逊云储存器S3 BCUKET安全性学习笔记
    (web)Bugs_Bunny_CTF_writeup 部分简单web
    给windows右键添加快捷启动程序
    nmap学习笔记
    暴力美学
    Metasploit学习笔记
    钓鱼+DNS欺骗学习笔记
    第 5 章 if 语句
    第 4 章 操作列表
    3.3 组织列表
  • 原文地址:https://www.cnblogs.com/clds/p/5570137.html
Copyright © 2011-2022 走看看