zoukankan      html  css  js  c++  java
  • 【源码】RingBuffer(二)——消费者

    消费者如何读取数据?

    前一篇是生产者的处理,这一篇讲消费者的处理

    我们都知道,消费者无非就是不停地从队列中读取数据,处理数据。但是与BlockedQueue不同的是,RingBuffer的消费者不会对队列进行上锁,那它是怎样实现的呢?

    概括地说,就是通过CAS原子性地得到一个可消费的序号,然后再根据序号取出数据进行处理。

    在看代码之前,我们先把能想到的东西先罗列一下:

    1.需要一个尾指针来追踪消费状态

    2.如何防止一个数据被多个消费者重复消费?

    3.消费速度不能超过生产者,如何限制?

    4.当没有可处理数据的时候消费者该做什么,自旋还是挂起等待生产者唤醒?

    5.如果4选择挂起,那么如果RingBuffer关闭,如何唤醒消费者以终结线程任务?

    6.RingBuffer构造的时候需要传入线程工厂,RingBuffer是如何使用线程的,多个任务使用一个线程调度?

    7.消费者何时启动?

    好,问题有了,现在我们来看代码,下面是EventProcessor的一个实现,WorkProcessor的部分代码。

    public final class WorkProcessor<T>
        implements EventProcessor
    {
        private final AtomicBoolean running = new AtomicBoolean(false); //当前处理器状态
        private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //当前已消费过的最新序号
        private final RingBuffer<T> ringBuffer; //保留此引用,方便取数据
        private final SequenceBarrier sequenceBarrier; //用于等待下一个最大可用序号,可与多个Processor共用
        private final WorkHandler<? super T> workHandler; //实际上的处理器
        private final ExceptionHandler<? super T> exceptionHandler;
        private final Sequence workSequence; //多个Processor共用的workSequence,可以得到下一个待处理的序号
    
        //....
        @Override
        public void run()
        {
            if (!running.compareAndSet(false, true))  //防止run方法重复调用造成的问题
            {
                throw new IllegalStateException("Thread is already running");
            }
            sequenceBarrier.clearAlert();
    
            notifyStart();
    
            boolean processedSequence = true;
            long cachedAvailableSequence = Long.MIN_VALUE;
            long nextSequence = sequence.get();
            T event = null;
            while (true)  //死循环
            {
                try
                {
                    if (processedSequence)
                    {
                        if (!running.get()) //如果检查到已关闭,则唤醒在同一个Barrier上的其他processor线程
                        {
                            sequenceBarrier.alert();  //唤醒其他线程
                            sequenceBarrier.checkAlert(); //抛出异常,终结此线程
                        }
                        processedSequence = false;
                        do
                        {
                            //workSequence可能和多个Processor共用
                            nextSequence = workSequence.get() + 1L;
                            //这个sequence才是当前处理器处理过的序号,生产者判断尾指针的时候就是按照这个来的,这个就是gatingSequence
                            //拿到下一个新序号的时候,说明workSequence前一个数据已经处理过了
                            sequence.set(nextSequence - 1L);
                        }
                        //由于workSequence可能由多个Processor共用,故存在竞争情况,需要使用CAS
                        while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                    }
    
                    //如果没有超过上一次缓存生产者的最大序号,则表明数据可取
                    if (cachedAvailableSequence >= nextSequence)
                    {
                        //取出序号对应位置的数据
                        event = ringBuffer.get(nextSequence);
                        //交给handler处理
                        workHandler.onEvent(event);
                        processedSequence = true;
                    }
                    else
                    {
                        //阻塞等待下一个可用的序号
                        //如果就是nextSequence,就返回nextSequence
                        //如果可用的大于nextSequence,则返回最新可用的sequence
                        cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                    }
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex) //checkAlert()抛出的
                {
                    if (!running.get()) //如果已经结束,则终结循环,线程任务结束
                    {
                        break;
                    }
                }
                catch (final Throwable ex) //其他异常,则交给异常处理器处理
                {
                    // handle, mark as processed, unless the exception handler threw an exception
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    processedSequence = true;
                }
            }
    
            notifyShutdown();
    
            running.set(false);
        }
        //...
    
        
    }

     针对问题一:需要一个尾指针来追踪消费状态

    你们注意到代码中有两个Sequence,workSequence和sequence。为啥需要两个呢?

    workSequence消费者使用的最新序号(该序号的数据未被处理过,只是被消费者标记成可消费);而sequence序号的数据则是被消费过的,这个序号正是前一篇中的gatingSequence。

    针对问题二:如何防止一个数据被多个消费者重复消费?

    问题二的解决方案就是WorkPool,即让多个WorkProcessor共用一个workSequence,这样它们就会竞争序号,一个序号只能被消费一次。

    public final class WorkerPool<T>
    {
        private final AtomicBoolean started = new AtomicBoolean(false);
        private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //从-1开始
        private final RingBuffer<T> ringBuffer; //RingBuffer引用,用于构造Processor,取数据
        private final WorkProcessor<?>[] workProcessors;
        //...
        public WorkerPool(
            final RingBuffer<T> ringBuffer,
            final SequenceBarrier sequenceBarrier,
            final ExceptionHandler<? super T> exceptionHandler,
            final WorkHandler<? super T>... workHandlers)
        {
            this.ringBuffer = ringBuffer;
            final int numWorkers = workHandlers.length;
            workProcessors = new WorkProcessor[numWorkers];
    
            //每个handler构造一个Processor
            for (int i = 0; i < numWorkers; i++)
            {
                workProcessors[i] = new WorkProcessor<>(
                    ringBuffer,
                    sequenceBarrier, //共用同一个sequenceBarrier
                    workHandlers[i],
                    exceptionHandler,
                    workSequence); //共用同一个workSequence
            }
        }
    
        //...
    }
    
    public class Disruptor<T>
    {
    
        //...
        //为每个WorkHandler构造一个WorkProcessor,再包装成一个WorkerPool
        public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
        {
            return createWorkerPool(new Sequence[0], workHandlers);
        }
        //....
    
    }

    针对问题三、四:消费速度不能超过生产者,如何限制?当没有可处理数据的时候消费者该做什么,自旋还是挂起等待生产者唤醒?

    使用SequenceBarrier,从WorkProcessor的代码中我们可以知道,消费者会缓存上次获取的最大可消费序号,然后在这序号范围内都可以直接竞争。每次获取最小可用序号的时候,则会触发waitStrategy等待策略进行等待。

     final class ProcessingSequenceBarrier implements SequenceBarrier
    {
        private final WaitStrategy waitStrategy; //等待策略
        private final Sequence dependentSequence; //依赖的序号,默认为RingBuffer的sequence
        private volatile boolean alerted = false; 
        private final Sequence cursorSequence; //RingBuffer的sequence
        private final Sequencer sequencer; 
    
        //...
        public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
        {
            checkAlert(); //如果已shutdown,则抛出异常,终结任务
    
            //sequence为消费者想要的下一个序号
            //cursorSequence为RingBuffer的序号(生产者最新序号)
            //dependentSequence默认就是cursorSequence
            //特殊情况下,例如消费者B要求只能消费消费者A消费过的,则dependentSequence就会是消费者A的sequence
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    
            if (availableSequence < sequence)
            {
                return availableSequence;
            }
    
            //得到的序号是生产者用过的序号,但是该序号对应的数据可能未发布,如果访问未发布的数据,就会影响正确性,因为可能该数据还处于translate阶段
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);
        }
        //...
    }

     其中等待策略有很多中,常见的就是BlockingWaitStategy,该等待策略会挂起执行线程。当生产者publishEvent的时候,则会调用WaitStrategy#signalAllWhenBlocking()方法唤醒所有等待线程。

    public final class BlockingWaitStrategy implements WaitStrategy
    {
        private final Object mutex = new Object(); //使用对象内置的条件队列
    
        @Override
        public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException
        {
            long availableSequence;
            if (cursorSequence.get() < sequence) //当生产者序号小于消费者需要的序号时,挂起等待唤醒
            {
                synchronized (mutex)
                {
                    while (cursorSequence.get() < sequence) //使用while是为了防止被错误唤醒,所以被唤醒后还会再判断条件是否满足
                    {
                        barrier.checkAlert();
                        mutex.wait();
                    }
                }
            }
    
            //生产者序号满足后,查看依赖项是否满足
            //如果依赖的消费者的序号小于需求序号,即依赖的消费者还没消费过需求序号
            //则自旋等待
            while ((availableSequence = dependentSequence.get()) < sequence)
            {
                barrier.checkAlert();
                ThreadHints.onSpinWait();
            }
    
            return availableSequence;
        }
    
        @Override
        public void signalAllWhenBlocking() //开设接口,用于唤醒条件队列内的等待线程
        {
            synchronized (mutex)
            {
                mutex.notifyAll();
            }
        }
    
        @Override
        public String toString()
        {
            return "BlockingWaitStrategy{" +
                "mutex=" + mutex +
                '}';
        }
    }

    针对问题六、七:RingBuffer构造的时候需要传入线程工厂,RingBuffer是如何使用线程的,多个任务使用一个线程调度?消费者何时启动?

    消费者随Disruptor启动,Disruptor启动时会从ConsumerRepository中取出Consumer,提交给Executor执行。

    public RingBuffer<T> start()
        {
            checkOnlyStartedOnce();
            for (final ConsumerInfo consumerInfo : consumerRepository)
            {
                consumerInfo.start(executor);
            }
    
            return ringBuffer;
        }

    其中,在新版的Disruptor中,不建议使用外部传入的Executor,而是只传ThreadFactory,然后由内部构造一个Executor,就是BasicExecutor。它的实现就是每次提交的任务都创建一个新的线程负责。所以它的线程模型就是一个消费者一个线程。

    public class Disruptor<T>
    {
        //...
        public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
        {
            this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
        }
        //...
    }
    
    public class BasicExecutor implements Executor
    {
        private final ThreadFactory factory;
        private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
    
        public BasicExecutor(ThreadFactory factory)
        {
            this.factory = factory;
        }
    
        @Override
        public void execute(Runnable command)
        {
            //每提交一个任务就新建一个新的线程处理这个任务
            final Thread thread = factory.newThread(command);
            if (null == thread)
            {
                throw new RuntimeException("Failed to create thread to run: " + command);
            }
    
            thread.start();
    
            threads.add(thread);
        }
        //...
    }
  • 相关阅读:
    那海蓝蓝 微博
    林子雨老师团队《Architecture of a Database System》 中文版
    MySQL优化---DBA对MySQL优化的一些总结
    MySQL索引原理及慢查询优化 美团
    MySQL 调优基础:Linux内存管理 Linux文件系统 Linux 磁盘IO Linux网络
    pthread_mutex_t
    知数堂MYSQL优化课---CU论坛版主 DBA 博客
    MySQL通用优化 叶金荣!!!
    MySQL 之 Metadata Locking 研究
    MySQL 调优基础(三) Linux文件系统
  • 原文地址:https://www.cnblogs.com/longfurcat/p/12864491.html
Copyright © 2011-2022 走看看