zoukankan      html  css  js  c++  java
  • 【Java多线程】Disruptor源码分析(三十四)

    Disruptor类图

      

    Disruptor执行过程

      

    Disruptor源码分析

      1 // 环形缓冲区
      2 private final RingBuffer<T> ringBuffer;
      3 // 实现者
      4 private final Executor executor;
      5 // 消费仓库
      6 private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
      7 // 启动标识,原子boolean类,Disruptor只能启动一次
      8 private final AtomicBoolean started = new AtomicBoolean(false);
      9 // 异常处理器
     10 private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
     11 
     12 
     13 
     14 
     15 // 主要有3个未过时的构造方法
     16 public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
     17 {
     18     this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
     19 }
     20 
     21 
     22 // eventFactory 事件工厂
     23 // ringBufferSize 环形缓冲区大小
     24 // producerType 生产者类型
     25 // waitStrategy 阻塞策略
     26 public Disruptor(
     27         final EventFactory<T> eventFactory,
     28         final int ringBufferSize,
     29         final ThreadFactory threadFactory,
     30         final ProducerType producerType,
     31         final WaitStrategy waitStrategy)
     32 {
     33     this(
     34         RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
     35         new BasicExecutor(threadFactory));
     36 }
     37 
     38 
     39 private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
     40 {
     41     this.ringBuffer = ringBuffer;
     42     this.executor = executor;
     43 }
     44 
     45 
     46 
     47 
     48 
     49 // 创建环形缓冲区
     50 public static <E> RingBuffer<E> create(
     51     ProducerType producerType,
     52     EventFactory<E> factory,
     53     int bufferSize,
     54     WaitStrategy waitStrategy)
     55 {
     56     // 选择对应生产者类型的环形缓冲区创建
     57     switch (producerType)
     58     {
     59         case SINGLE:
     60             // 单生产者类型
     61             return createSingleProducer(factory, bufferSize, waitStrategy);
     62         case MULTI:
     63             return createMultiProducer(factory, bufferSize, waitStrategy);
     64         default:
     65             throw new IllegalStateException(producerType.toString());
     66     }
     67 }
     68 
     69 
     70 // 创建单生产者类型的环形缓冲区
     71 public static <E> RingBuffer<E> createSingleProducer(
     72     EventFactory<E> factory,
     73     int bufferSize,
     74     WaitStrategy waitStrategy)
     75 {
     76     // 
     77     SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
     78 
     79     return new RingBuffer<E>(factory, sequencer);
     80 }
     81 
     82 // 构造方法 SingleProducerSequencer
     83 public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
     84 {
     85     super(bufferSize, waitStrategy);
     86 }
     87 
     88 
     89 // 构造方法 RingBuffer
     90 RingBuffer(
     91     EventFactory<E> eventFactory,
     92     Sequencer sequencer)
     93 {
     94     super(eventFactory, sequencer);
     95 }
     96 
     97 
     98 
     99 
    100 // RingBuffer 的父中存在的静态块
    101 bstract class RingBufferFields<E> extends RingBufferPad
    102 {
    103     // 缓存填充数量,即空白数组长度(看静态代码块理解)
    104     private static final int BUFFER_PAD;
    105     // 真实存数据数组的内存偏移量(看静态代码块理解)
    106     private static final long REF_ARRAY_BASE;
    107     // 引用数据类型左移数(看静态代码块理解)
    108     private static final int REF_ELEMENT_SHIFT;
    109     private static final Unsafe UNSAFE = Util.getUnsafe();
    110 
    111     static
    112     {
    113         // UNSAFE.arrayIndexScale 是获取一个数组在内存中的scale,
    114         // 也就是每个数组元素在内存中的元素的内存偏移量
    115         // 因为我们的event是任意一个对象,所以在这里用一个Object的数组class来求scale
    116         final int scale = UNSAFE.arrayIndexScale(Object[].class);
    117         // 不同的JVM设置,它的指针大小是不一样的
    118         if (4 == scale)
    119         {
    120             // 计算内存偏移量的常量,数量 << REF_ELEMENT_SHIFT
    121             // 例如:int[2],第一个元素的偏移量就是 16 + 0 << 2,第二个 16 + 1 << 2
    122             REF_ELEMENT_SHIFT = 2;
    123         }
    124         else if (8 == scale)
    125         {
    126             // 例如:long[2],第一个元素的偏移量就是 16 + 0 << 3,第二个 16 + 1 << 3
    127             REF_ELEMENT_SHIFT = 3;
    128         }
    129         else
    130         {
    131             throw new IllegalStateException("Unknown pointer size");
    132         }
    133         // 缓存行是64个字节
    134         // 由于数据是保存在数组中的,所以要将数据与其他变量隔开,让数组真实数据与其他变量不在同一个缓存行中
    135         // 即:A变量 + 缓存隔离带(128个字节) + 数组真实数据 + 缓存隔离带(128个字节) + B变量
    136         // 缓存隔离带使用空数组来代替
    137         // 即:A变量 + 空白数组(128个字节) + 数组真实数据 + 空白数组(128个字节) + A变量
    138         // 计算:128 / 对象在内存的大小 scale = BUFFER_PAD (即是空白数组的长度)
    139         BUFFER_PAD = 128 / scale;
    140         // 获取数组在内存中的偏移量,也就是第一个真实数据元素的内存偏移量
    141         // 由于数组 是分三部分,空白数组(128个字节) + 数组真实数据 + 空白数组(128个字节) 
    142         // 那么数组真实数据起始偏移量就是:UNSAFE.arrayBaseOffset(Object[].class) + 空白数组(128个字节)
    143         // 空白数据的个数就是 BUFFER_PAD,它向左移 REF_ELEMENT_SHIFT ,得到的就是空白数组的内存偏移量
    144         // REF_ARRAY_BASE = (数组对象头 + 空白数组(128个字节))
    145         REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    146 
    147     }
    148 
    149     private final long indexMask; // 索引标识
    150     private final Object[] entries;//申明一个对象数组
    151     protected final int bufferSize;
    152     protected final Sequencer sequencer;
    153 
    154     RingBufferFields(
    155         EventFactory<E> eventFactory,
    156         Sequencer sequencer)
    157     {
    158         this.sequencer = sequencer;
    159         this.bufferSize = sequencer.getBufferSize();
    160 
    161         if (bufferSize < 1)
    162         {
    163             throw new IllegalArgumentException("bufferSize must not be less than 1");
    164         }
    165         if (Integer.bitCount(bufferSize) != 1)
    166         {
    167             throw new IllegalArgumentException("bufferSize must be a power of 2");
    168         }
    169         // 索引标识
    170         this.indexMask = bufferSize - 1; //indexMask 就是数组的最大下标
    171         // 有上静态代码块可知,整个数组是 空白数组(128个字节) + 数组真实数据 + 空白数组(128个字节)
    172         // 数据长度就是:BUFFER_PAD + sequencer.getBufferSize() + BUFFER_PAD
    173         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    174         // 给正式数据数组,初始化值,使用eventFactory
    175         fill(eventFactory);
    176     }
    177 }
    178 
    179 // 获取sequence下标的元素
    180 protected final E elementAt(long sequence)
    181 {
    182     // 构造方法初始化时:indexMask = bufferSize - 1;
    183     // 整个数组是 空白数组(128个字节) + 数组真实数据 + 空白数组(128个字节)
    184     // REF_ARRAY_BASE = (数组对象头 + 空白数组(128个字节))
    185     // 根据内存偏移量取获取对应数组元素
    186     return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    187 }
    188 
    189 // 填入生产事件
    190 private void fill(EventFactory<E> eventFactory)
    191 {
    192     for (int i = 0; i < bufferSize; i++)
    193     {
    194         entries[BUFFER_PAD + i] = eventFactory.newInstance();
    195     }
    196 }
    197 
    198 
    199 // 构造方法 BasicExecutor
    200 public BasicExecutor(ThreadFactory factory)
    201 {
    202     this.factory = factory;
    203 }
    204 
    205 
    206 
    207 // Disruptor 类中 handleEventsWith()
    208 public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    209 {
    210     return createEventProcessors(new Sequence[0], handlers);
    211 }
    212 
    213 
    214 // Disruptor 类中 createEventProcessors()
    215 EventHandlerGroup<T> createEventProcessors(
    216         final Sequence[] barrierSequences,
    217         final EventHandler<? super T>[] eventHandlers)
    218 {
    219     // 检查没有启动
    220     checkNotStarted();
    221 
    222     // 创建处理序列器组
    223     final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    224     // 创建序列栅栏
    225     final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    226 
    227     // 遍历事件处理器,创建对应的批处理事件进程,每个事件一个
    228     for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    229     {
    230         final EventHandler<? super T> eventHandler = eventHandlers[i];
    231 
    232         // 创建对应的批处理事件进程
    233         final BatchEventProcessor<T> batchEventProcessor =
    234             new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
    235 
    236         if (exceptionHandler != null)
    237         {
    238             batchEventProcessor.setExceptionHandler(exceptionHandler);
    239         }
    240 
    241         // 增加消费仓库信息
    242         consumerRepository.add(batchEventProcessor, eventHandler, barrier);
    243         // 将sequence添加到组中
    244         processorSequences[i] = batchEventProcessor.getSequence();
    245     }
    246 
    247     // 
    248     updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
    249 
    250     return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    251 }
    252 
    253 // 更新栅栏序列信息
    254 private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    255 {
    256     if (processorSequences.length > 0)
    257     {
    258         // 给 ringBuffer 添加 进程中的 栅栏序列信息
    259         ringBuffer.addGatingSequences(processorSequences);
    260         for (final Sequence barrierSequence : barrierSequences)
    261         {
    262             // 移除 barrierSequence
    263             ringBuffer.removeGatingSequence(barrierSequence);
    264         }
    265         consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    266     }
    267 }
    268 
    269 
    270 
    271 // 启动RingBuffer
    272 public RingBuffer<T> start()
    273 {
    274     // 启动,检查是否只启动一次
    275     checkOnlyStartedOnce();
    276     // 遍历消费者仓库
    277     for (final ConsumerInfo consumerInfo : consumerRepository)
    278     {
    279         // 消费
    280         consumerInfo.start(executor);
    281     }
    282 
    283     return ringBuffer;
    284 }
    285 
    286 
    287 
    288 // 启动,并检查是否只启动一次
    289 private void checkOnlyStartedOnce()
    290 {
    291     if (!started.compareAndSet(false, true))
    292     {
    293         throw new IllegalStateException("Disruptor.start() must only be called once.");
    294     }
    295 }
    296 
    297 
    298 // EventProcessorInfo 类中的 start()
    299 @Override
    300 public void start(final Executor executor)
    301 {
    302     // 执行事件处理器
    303     executor.execute(eventprocessor);
    304 }
    305 
    306 
    307 
    308 
    309 // 执行事件处理器
    310 @Override
    311 public void execute(Runnable command)
    312 {
    313     // 使用线程工厂创建了一个线程
    314     final Thread thread = factory.newThread(command);
    315     if (null == thread)
    316     {
    317         throw new RuntimeException("Failed to create thread to run: " + command);
    318     }
    319 
    320     // 启动线程
    321     thread.start();
    322 
    323     // 添加到线程队列中
    324     threads.add(thread);
    325 }
    326 
    327 
    328 // 获取 环形缓冲区
    329 public RingBuffer<T> getRingBuffer()
    330 {
    331     return ringBuffer;
    332 }
    333 
    334 
    335 // 获取序列号为 sequence 的元素
    336 public E get(long sequence)
    337 {
    338     // 获取序列号为 sequence 的元素
    339     return elementAt(sequence);
    340 }
    341 
    342 
    343 
    344 // RingBuffer 类中的 next()
    345 public long next()
    346 {
    347     return sequencer.next();
    348 }
    349 
    350 
    351 // SingleProducerSequencer 类中的 next()
    352 public long next()
    353 {
    354     return next(1);
    355 }
    356 
    357 
    358 // 获取下一个Sequencer 序列号
    359 public long next(int n)
    360 {
    361     if (n < 1)
    362     {
    363         throw new IllegalArgumentException("n must be > 0");
    364     }
    365 
    366     // 取出 Sequencer 记录的 上一次返回序列号nextValue
    367     long nextValue = this.nextValue;
    368     // 根据要取出数量n,计算下标nextSequence值
    369     long nextSequence = nextValue + n;
    370     // 超过缓存大小的差值
    371     long wrapPoint = nextSequence - bufferSize;
    372     // 缓存值大门值
    373     long cachedGatingSequence = this.cachedValue;
    374     // 判断超过缓存大小的差值 是否大于 大门值 或者 大门值 > nextValue
    375     if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    376     {
    377         // CAS设置了游标的值
    378         // 加入了StoreLoad屏障,保证写入都,其他线程可见
    379         cursor.setVolatile(nextValue);  // StoreLoad fence
    380         // 最小顺序值
    381         long minSequence;
    382         // 判断超过缓存大小的差值 是否大于 最小的序号
    383         while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
    384         {
    385             // 休眠1纳秒
    386             LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    387         }
    388         //  最小的序号 赋值给 缓存值
    389         this.cachedValue = minSequence;
    390     }
    391     // 记录返回序列号
    392     this.nextValue = nextSequence;
    393 
    394     return nextSequence;
    395 }
    396 
    397 
    398 
    399 
    400 // 发布
    401 public void publish(long sequence)
    402 {
    403     // 设置游标序号
    404     cursor.set(sequence);
    405     // 通知所有阻塞线程
    406     waitStrategy.signalAllWhenBlocking();
    407 }
    408 
    409 
    410 
    411 // 运行
    412 public void run()
    413 {
    414     // 判断线程运行状态
    415     if (!running.compareAndSet(IDLE, RUNNING))
    416     {
    417         if (running.get() == RUNNING)
    418         {
    419             throw new IllegalStateException("Thread is already running");
    420         }
    421     }
    422     // 序列号屏障情况告警
    423     sequenceBarrier.clearAlert();
    424 
    425     // 通知开启
    426     notifyStart();
    427 
    428     try
    429     {
    430         // 判断线程是否处于暂停状态
    431         if (running.get() == HALTED)
    432         {
    433             return;
    434         }
    435 
    436         T event = null;
    437         // 下一个序列号
    438         long nextSequence = sequence.get() + 1L;
    439 
    440         while (true)
    441         {
    442             try
    443             {
    444                 // 获取可用的序列号--无法获取到,在其中等待
    445                 final long availableSequence = sequenceBarrier.waitFor(nextSequence);
    446                 if (batchStartAware != null)
    447                 {
    448                     batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
    449                 }
    450 
    451                 // 下一个序列号 <= 获取可用的序列号
    452                 while (nextSequence <= availableSequence)
    453                 {
    454                     // 获取对应的元素
    455                     event = dataProvider.get(nextSequence);
    456                     // 调用onEvent() 处理事件元素
    457                     eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    458                     nextSequence++;
    459                 }
    460                 // 直到 下一个序列号 > 获取可用的序列号
    461                 // 记录 序列号到达的位置
    462                 sequence.set(availableSequence);
    463             }
    464             catch (final TimeoutException e)
    465             {
    466                 notifyTimeout(sequence.get());
    467             }
    468             catch (final AlertException ex)
    469             {
    470                 if (running.get() != RUNNING)
    471                 {
    472                     break;
    473                 }
    474             }
    475             catch (final Throwable ex)
    476             {
    477                 exceptionHandler.handleEventException(ex, nextSequence, event);
    478                 sequence.set(nextSequence);
    479                 nextSequence++;
    480             }
    481         }
    482     }
    483     finally
    484     {   
    485         // 通知关闭
    486         notifyShutdown();
    487         // 设置线程状态
    488         running.set(IDLE);
    489     }
    490 }
    491 
    492 
    493 
    494 
    495 public void clearAlert()
    496 {
    497     alerted = false;
    498 }
    499 
    500 
    501 
    502 
    503 // ProcessingSequenceBarrier 中的waitFor()
    504 public long waitFor(final long sequence)
    505     throws AlertException, InterruptedException, TimeoutException
    506 {
    507     // 检查告警,如果告警直接报异常
    508     // disruptor调用停止方法时,alert为true,则会告警
    509     checkAlert();
    510 
    511     // 获取可用的序列号
    512     long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    513 
    514     // 判断可用序列号 是否小于 请求序列号
    515     if (availableSequence < sequence)
    516     {
    517         return availableSequence;
    518     }
    519 
    520     // 返回最高可用的序列号,即是 availableSequence
    521     return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    522 }
    523 
    524 // BlockingWaitStrategy 中的 waitFor()
    525 public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    526     throws AlertException, InterruptedException
    527 {
    528     long availableSequence;
    529     // 判断游标中的序列号,是否小于 请求的序列号
    530     if (cursorSequence.get() < sequence)
    531     {
    532         // 加锁
    533         lock.lock();
    534         try
    535         {   
    536             // 判断游标中的序列号,是否小于 请求的序列号
    537             while (cursorSequence.get() < sequence)
    538             {
    539                 // 检查告警, disruptor调用停止方法时会告警
    540                 barrier.checkAlert();
    541                 // 线程条件等待
    542                 processorNotifyCondition.await();
    543             }
    544         }
    545         finally
    546         {
    547             // 解锁
    548             lock.unlock();
    549         }
    550     }
    551 
    552     // 从依赖的序列器中获取 可用的序列号 
    553     while ((availableSequence = dependentSequence.get()) < sequence)
    554     {
    555 
    556         barrier.checkAlert();
    557         ThreadHints.onSpinWait();
    558     }
    559 
    560     return availableSequence;
    561 }
    View Code
  • 相关阅读:
    LeetCode对撞指针汇总
    167. Two Sum II
    215. Kth Largest Element in an Array
    2018Action Recognition from Skeleton Data via Analogical Generalization over Qualitative Representations
    题解 Educational Codeforces Round 84 (Rated for Div. 2) (CF1327)
    题解 JZPKIL
    题解 八省联考2018 / 九省联考2018
    题解 六省联考2017
    题解 Codeforces Round #621 (Div. 1 + Div. 2) (CF1307)
    题解Codeforces Round #620 (Div. 2)
  • 原文地址:https://www.cnblogs.com/h--d/p/14617185.html
Copyright © 2011-2022 走看看