zoukankan      html  css  js  c++  java
  • Disruptor 创建过程

    1 Disruptor disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, ringBufferSize, exec,
                            ProducerType.MULTI, waitStrategy);

        public Disruptor(final EventFactory<T> eventFactory,
                         final int ringBufferSize,
                         final Executor executor,
                         final ProducerType producerType,
                         final WaitStrategy waitStrategy)
        {
            this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
                 executor);
        }
      
        public static <E> RingBuffer<E> create(ProducerType    producerType,
                                               EventFactory<E> factory,
                                               int             bufferSize,
                                               WaitStrategy    waitStrategy)
        {
            switch (producerType)
            {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
            }
        }
    
    

      

     

     createMultiProducer:

        public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,
                                                            int             bufferSize,
                                                            WaitStrategy    waitStrategy)
        {
            MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);//Sequencer
        //public final class MultiProducerSequencer extends AbstractSequencer
        //public abstract class AbstractSequencer implements Sequencer
    return new RingBuffer<E>(factory, sequencer); }
      
        RingBuffer(EventFactory<E> eventFactory,
                   Sequencer       sequencer)
        {
            this.sequencer    = sequencer;
            this.bufferSize   = sequencer.getBufferSize();
    
            if (bufferSize < 1)
            {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            }
            if (Integer.bitCount(bufferSize) != 1)
            {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            }
    
            this.indexMask = bufferSize - 1;
            this.entries   = new Object[sequencer.getBufferSize()];
            fill(eventFactory);
        }
        
        private void fill(EventFactory<E> eventFactory)
        {
            for (int i = 0; i < entries.length; i++)
            {
                entries[i] = eventFactory.newInstance();
            }
        }
    
    
    
     
    
    
    
     

    2 disruptor.handleEventsWith(eventHandlers);

      

        public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)
        {
            return createEventProcessors(new Sequence[0], handlers);
        }
      EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                                   final EventHandler<T>[] eventHandlers)
        {
            checkNotStarted();
    
            final Sequence[] processorSequences = new Sequence[eventHandlers.length];
            final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);//
    //ProcessingSequenceBarrier ProcessingSequenceBarrier implements SequenceBarrier
    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
    consumerRepository 与 batchEventProcessor eventHandler SequenceBarrier 关联起来
    EventHandlerGroup 与disruptor consumerRepository Sequence 关联起来
    
    
    

      

    3 disruptor.start();

        public RingBuffer<T> start()
        {
            Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
            ringBuffer.addGatingSequences(gatingSequences);
    
            checkOnlyStartedOnce();
            for (ConsumerInfo consumerInfo : consumerRepository)
            {
                consumerInfo.start(executor);//自定义的线程池执行EventProcessorInfo
            }
    
            return ringBuffer;
        }

    class EventProcessorInfo<T> implements ConsumerInfo
        @Override
        public void start(final Executor executor)
        {
            executor.execute(eventprocessor);//batchEventProcessor
        }

    BatchEventProcessor RUN方法
      @Override
        public void run()
        {
            if (!running.compareAndSet(false, true))
            {
                throw new IllegalStateException("Thread is already running");
            }
            sequenceBarrier.clearAlert();

            notifyStart();

            T event = null;
    //Concurrent sequence class used for tracking the progress of
    // * the ring buffer and event processors.  Support a number
     //* of concurrent operations including CAS and order writes
            long nextSequence = sequence.get() + 1L;//使用CAS取得下一个序列号
            try
            {
                while (true)
                {
                    try
                    {
                        final long availableSequence = sequenceBarrier.waitFor(nextSequence);//等待策略取得可用序列号

                        if (nextSequence > availableSequence)//
                        {
                            Thread.yield();
                        }

                        while (nextSequence <= availableSequence)
                        {
                            event = dataProvider.get(nextSequence);//dataProvider 就是ringbuffer
                            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//触发自定义事件
                            nextSequence++;
                        }

                        sequence.set(availableSequence);
                    }
                    catch (final TimeoutException e)
                    {
                        notifyTimeout(sequence.get());
                    }
                    catch (final AlertException ex)
                    {
                        if (!running.get())
                        {
                            break;
                        }
                    }
                    catch (final Throwable ex)
                    {
                        exceptionHandler.handleEventException(ex, nextSequence, event);
                        sequence.set(nextSequence);
                        nextSequence++;
                    }
                }
            }
            finally
            {
                notifyShutdown();
                running.set(false);
            }
        }

     4 介绍 final long availableSequence = sequenceBarrier.waitFor(nextSequence);//等待可用序列号

      内容代码

        @Override
        public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
        {
            checkAlert();
    
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    
            if (availableSequence < sequence)
            {
                return availableSequence;
            }
    
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);
        }

      6种等待策略

      

    //MultiProducerSequencer   Sequencer   
    @Override
    public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1;//实际上实现了循环等待 } } return availableSequence; }

        @Override
        public boolean isAvailable(long sequence)
        {
            int index = calculateIndex(sequence);
            int flag = calculateAvailabilityFlag(sequence);
            long bufferAddress = (index * SCALE) + BASE;
            return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
        }
    
    
    
     

    5 disruptor.publishEvent(eventTranslator, msg);

        public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, A arg)
        {
            ringBuffer.publishEvent(eventTranslator, arg);
        }
        
    RingBuffer.class
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); }

      final long sequence = sequencer.next(); 
    //MultiProducerSequencer
        public long next()
        {
            return next(1);
        }
      public long next(int n)
        {
            if (n < 1)
            {
                throw new IllegalArgumentException("n must be > 0");
            }

            long current;
            long next;

            do
            {
                current = cursor.get();
                next = current + n;

                long wrapPoint = next - bufferSize;
                long cachedGatingSequence = gatingSequenceCache.get();

                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                {
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                    if (wrapPoint > gatingSequence)
                    {
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }

                    gatingSequenceCache.set(gatingSequence);
                }
                else if (cursor.compareAndSet(current, next))
                {
                    break;
                }
            }
            while (true);

            return next;
        }
     
        private void translateAndPublish(EventTranslator<E> translator, long sequence)
        {
            try
            {
                translator.translateTo(get(sequence), sequence);// 取位置 调用自定义类设置值 
            //上面是自定义设置值类    
        // EventTranslatorOneArg<ValueEvent, String> eventTranslator = new EventTranslatorOneArg<ValueEvent, String>() {
         //   @Override
          //  public void translateTo(ValueEvent event, long sequence, String msg) {
          //      event.setValue(msg);
          //  }
        }; }
    finally { sequencer.publish(sequence); } }
    //   MultiProducerSequencer
    public void publish(final long sequence)
        {
            setAvailable(sequence);//设置可用
            waitStrategy.signalAllWhenBlocking();
        }
    //SingleProducerSequencer
        public void publish(long sequence)
        {
            cursor.set(sequence);//cursor就是 Sequence
            waitStrategy.signalAllWhenBlocking();
        }
    
    
    
     
  • 相关阅读:
    为图片指定区域添加链接
    数值取值范围问题
    【leetcode】柱状图中最大的矩形(第二遍)
    【leetcode 33】搜索旋转排序数组(第二遍)
    【Educational Codeforces Round 81 (Rated for Div. 2) C】Obtain The String
    【Educational Codeforces Round 81 (Rated for Div. 2) B】Infinite Prefixes
    【Educational Codeforces Round 81 (Rated for Div. 2) A】Display The Number
    【Codeforces 716B】Complete the Word
    一个简陋的留言板
    HTML,CSS,JavaScript,AJAX,JSP,Servlet,JDBC,Structs,Spring,Hibernate,Xml等概念
  • 原文地址:https://www.cnblogs.com/clds/p/5570137.html
Copyright © 2011-2022 走看看