zoukankan      html  css  js  c++  java
  • Disruptor中shutdown方法失效,及产生的不确定性源码分析

     版权声明:原创作品,谢绝转载!否则将追究法律责任。

    Disruptor框架是一个优秀的并发框架,利用RingBuffer中的预分配内存实现内存的可重复利用,降低了GC的频率。

    具体关于Disruptor的原理,参见:http://ifeve.com/disruptor/,本文不在赘述。

    在Disruptor的使用中,偶尔会出现调用了shutdown函数但程序并未终止的现象。在网上已有的文章中并没有对该问题的分析,本文对此现象进行总结和说明:

    例子:相关的Event、EventHandler、Producer及OrderFactory定义

    1.1 消费事件Event类Order

    Order用于生产者生产事件,消费者消费事件。是RingBuffer的槽中存储的数据类型。其定义如下:

    package liuqiang.instance;
    
    public class Order {
    
        private String id;
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
    }

    Order类中由一个id成员,用于生产者生产和消费者消费。

    1.2 事件处理器OrderHandler1

    定义一个事件处理器OrderHandler1。

    package liuqiang.instance;
    
    import com.lmax.disruptor.EventHandler;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class OrderHandler1 implements EventHandler<Order>{
        private String consumerId;
        private static AtomicInteger count = new AtomicInteger(0);
        public OrderHandler1(String consumerId){
            this.consumerId = consumerId;
        }
        public static int getCount(){
            return count.get();
        }
        @Override
        public void onEvent(Order order, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("OrderHandler1 " + this.consumerId + ",消费信息:" + order.getId());
            count.incrementAndGet();
        }
    }

    每次消费一个事件,count静态变量自增1次。

    1.3 生产者Producer

    生产者用于在ringBuffer放入生产的order信息。

    package liuqiang.instance;
    
    import com.lmax.disruptor.RingBuffer;
    
    public class Producer {
    
        private final RingBuffer<Order> ringBuffer;
        public Producer(RingBuffer<Order> ringBuffer){
            this.ringBuffer = ringBuffer;
        }
        public void onData(String data){
            long sequence = ringBuffer.next();
            try {
                Order order = ringBuffer.get(sequence);
                order.setId(data);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }

    onData方法首先获得Order的id属性值data,然后通过ringBuffer获取下一个sequence,并将data值填充进Order对象当中。

    ringBuffer中预先分配了order数组,生产过程仅仅是将数组中的对象属性更改,因此大大减少了gc过程。

    ringBuffer中预生成order对象数组,需要一个工厂方法。

     1.4 消费事件的工厂方法OrderFactory

    package liuqiang.instance;
    
    import com.lmax.disruptor.EventFactory;
    
    public class OrderFactory implements EventFactory<Order> {
        @Override
        public Order newInstance() {
            return new Order();
        }
    }

    该工厂方法重写了父类的newInstance方法,该方法仅仅是new一个尚未填充属性信息的Order对象,用于在RingBuffer中预先占据内存。

    场景一:disruptor.shutdown操作先于BatchEventProcessor的线程执行

    package liuqiang.instance;
    
    import com.lmax.disruptor.*;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.util.concurrent.Executors;
    
    public class Main1 {
    
        public static void main(String[] args) throws Exception {
            EventFactory<Order> factory = new OrderFactory();
            int ringBufferSize = 1024 * 1024;
            Disruptor<Order> disruptor =
                    new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
            disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2"));  //两个独立的消费者,各自对消费ringBuffer中的所有事件。
            disruptor.start();
            RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
            Producer producer = new Producer(ringBuffer);
         //producer生产1个对象,并存入ringBuffer数组当中
    for (long l = 0; l < 1; l++) { producer.onData(l + ""); } disruptor.shutdown();  //方法会阻塞,但该方法执行结束后,并不一定会使程序关闭,后面详细分析原因。 } }

    最终打印结果如下:

    OrderHandler1 1,消费信息:0
    OrderHandler1 2,消费信息:0

    两个消费者都已经消费掉了生产者生产的一个事件。然而,该程序多次运行,会发现存在某些次运行过程消费完事件后,程序并没有终止。调用jconsole查看线程相关信息,如下:

    BatchEventProcess的run方法中,调用消费者Handler方法进行消费。

    显然,shutdown关闭之后,该消费者依然在等待RingBuffer中新的event进行消费。这个问题是如何产生的呢?

    disruptor的start方法如下:

        public RingBuffer<T> start()
        {
            //开启Disruptor,保证只被开启一次
            checkOnlyStartedOnce();
            for (final ConsumerInfo consumerInfo : consumerRepository)
            {
                consumerInfo.start(executor);
            }
    
            return ringBuffer;
        }

    由此可见,start方法非阻塞的。将消费者线程放入一个线程池之后,即返回。start方法执行结束,此时的消费者线程未必会立刻运行!

    我们继续看,disruptor.shutdown方法。

        public void shutdown()
        {
            try
            {
                shutdown(-1, TimeUnit.MILLISECONDS);   //超时shutdown,此时超时时间设置为-1,表示一直阻塞,直到关闭
            }
            catch (final TimeoutException e)
            {
                exceptionHandler.handleOnShutdownException(e);
            }
        }
        public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
        {
            final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);// 此处可能积压事件处理完了,但生产者还没有生产完
            while (hasBacklog())
            {
                if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
                {
                    throw TimeoutException.INSTANCE;
                }
                // Busy spin
            }
            halt();
        }
    // 此处可能积压事件处理完了,但生产者还没有生产完
    private boolean hasBacklog()   { final long cursor = ringBuffer.getCursor(); //获取所有除去已经停止的customer之外的所有的sequence for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false))  //false参数。表明已经停止或未启动的消费者,不再考虑 { //说明有消费者还没有消费完cursor if (cursor > consumer.get()) { return true; } } //所有消费者都已经消费完所有事件,不存在积压的未处理事件 return false; } public void halt() { for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.halt(); } }

     此处关闭过程主要分为两部分:

    1. 判断RingBuffer当中是否还有未消费完的事件。

    2. 所有事件都消费完之后,调用halt终止各个Customer。

    consumerRepository.getLastSequenceInChain(false)

    false参数表示只取消费者线程集合中正在运行的消费者线程,并将对应消费者线程的alert标记置为true。尚未启动的消费者线程不受影响。

    此处为何要设置参数为false呢?个人理解为:消费者线程在运行中可能会抛出异常造成线程退出。如果shutdown方法考虑到这些消费者线程,则该消费者线程将永远不可能消费RingBuffer中的event,造成阻塞。

    另外解释一下getLastSequenceInChain的意义,我们在编写消费者的时候,常常会存在消费者之间的依赖关系。例如:消费者A先消费event,然后消费者B、C同时消费该event,最后消费者D在B、C之后最后消费该event。这样的结构,有利于编写出流水线式的处理。具体如下图所示:

    一个事件event,最后处理该事件的event一定是D,D处理过的事件,前驱消费者A、B、C一定都已经消费过了。在判断消费者集合中是否所有消费者都已经消费完某event,可以直接取依赖的末端消费者集合,进行判断即可。

    getLastSequenceInChain方法即是完成该操作。

        public Sequence[] getLastSequenceInChain(boolean includeStopped)  //返回集合是否要包含未启动或已停止的消费者线程
        {
            List<Sequence> lastSequence = new ArrayList<>();
            for (ConsumerInfo consumerInfo : consumerInfos)
            {
           //includeStopped为true,则无论消费者线程是否已经在执行,都返回。
    if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain()) { final Sequence[] sequences = consumerInfo.getSequences(); Collections.addAll(lastSequence, sequences); } } return lastSequence.toArray(new Sequence[lastSequence.size()]); }

    由于disruptor的shutdown方法中,最终调用的getLastSequenceInChain方法,inclueStopped为false。因此,如果消费者线程在调用shutown的时候尚未开启,此时就会导致返回的Sequence序列漏掉了这部分线程。导致shutdown方法失效。

    第二部分操作:halt,该方法最终调用BatchEventProcessor线程的halt方法。BatchEventProcessor线程负责对消费者OrderHandler1进行循环调用。其halt方法:

        public void halt()
        {
            running.set(false);  //运行状态设置为false
            sequenceBarrier.alert();  //Barrier调用alert方法,在后面消费者消费的时候,会查看该状态,以决定是否阻塞在ringBuffer上等待事件。
        }

    BatchEventProcessor线程的run方法如下。disruptor的shutdown方法最终的效果是,设置了正在运行的消费者线程的BatchEventProcessor的alert状态。而即便我们将shutdown调用过程中的getLastSequenceInChain方法的includeStopped设置为true,获取到未开启的消费者线程,在消费者线程执行的第一步也会将alert状态清除(sequenceBarrier.clearAlert();调用重置了alert的状态为false)。

    @Override
        public void run()
        {
            //确保一次只有一个线程执行这个方法
            if (!running.compareAndSet(false, true))
            {
                throw new IllegalStateException("Thread is already running");
            }
            //清除sequenceBarrier的通知状态!!!该代码是导致shutdown不起效果的原因
            sequenceBarrier.clearAlert();
            //通知生命周期eventHandler,事件处理开始
            notifyStart();
            T event = null;
            //获取下一个应当进行消费的序号
            long nextSequence = sequence.get() + 1L;
            try
            {
                while (true)
                {
                    try
                    {
                        //准备获取下一个可用于消费者线程的sequence。抛出AlertException,说明disruptor已经运行结束了。
                        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                        if (batchStartAware != null)
                        {
                            batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                        }
                        //针对从nextSequence到availableSequence的每一个event,调用相应的事件处理器进行处理
                        while (nextSequence <= availableSequence)
                        {
                            event = dataProvider.get(nextSequence);
                            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                            nextSequence++;
                        }
                        //当前消费到的序号写入sequence的value变量
                        sequence.set(availableSequence);
                    }
                    catch (final TimeoutException e)
                    {
                        notifyTimeout(sequence.get());
                    }
                    catch (final AlertException ex)
                    {
                        if (!running.get())
                        {
                            break;
                        }
                    }
                    catch (final Throwable ex)
                    {
                        exceptionHandler.handleEventException(ex, nextSequence, event);
                        sequence.set(nextSequence);
                        nextSequence++;
                    }
                }
            }
            finally
            {
                //事件处理完毕,发送关闭通知。
                notifyShutdown();
                //线程运行完毕,设置运行状态为false
                running.set(false);
            }
        }

    线程中会调用SequenceBarrier的waitFor方法等待下一个可消费事件的序号。该方法如下:

        @Override
        public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
        {
        //检查alert状态是否改变。如果alert了,则抛出异常,消费者线程执行结束 checkAlert();  //如果状态没有改变,继续根据后面的等待策略等待下一个可消费event的最大可用序号。
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence); }

    在具体的WaitStrategy中,会继续检查alert状态。以YieldWaitStrategy为例,其waitFor方法如下:

        private int applyWaitMethod(final SequenceBarrier barrier, int counter)
            throws AlertException
        {
            //判断是否在这一步骤时disruptor已经停止
            barrier.checkAlert();
            if (0 == counter)
            {
                Thread.yield();
            }
            else
            {
                --counter;
            }
            return counter;
        }

    在BatchEventProcessor消费的过程中,会多次检查alert状态。如果alert状态为true,则说明shutdown方法已经改变了该状态,程序需要停止。但如果shutdown线程先于消费者线程执行,则alert永远为false,消费者线程永远阻塞。

    场景二、disruptor的shutdown过程先于生产者的生产过程执行。

    package liuqiang.instance;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.util.concurrent.Executors;
    
    public class Main2 {
    
        public static void main(String[] args) throws Exception {
            EventFactory<Order> factory = new OrderFactory();
            int ringBufferSize = 1024 * 1024;
            Disruptor<Order> disruptor =
                    new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
            disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2"));
            disruptor.start();
            RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
            //生产者单独通过一个线程进行生产
            new Thread(new ProducerTask(ringBuffer)).start();
            //要想disruptor可以正常的关闭,还需要消费者线程在执行该方法时,已经全部正常启动。
            disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
        }
    }
    
    class ProducerTask implements Runnable {
        Producer producer;
        public ProducerTask(RingBuffer<Order> ringBuffer) {
            producer = new Producer(ringBuffer);
        }
        @Override
        public void run() {
            producer.onData("1");
            System.out.println("producer event finished");
        }
    }

    在这个例子中,我们将生产者的生产过程单独作为一个线程运行。

    多次尝试之后,可能出现的结果如下:

    输出结果1:(输出后执行结束)

    producer event finished

    输出结果2:(输出后程序不停止运行,三条打印信息的顺序可能不同)

    producer event finished
    OrderHandler1 2,消费信息:1
    OrderHandler1 1,消费信息:1

    输出结果3:(输出后程序停止运行,三条打印信息的顺序可能不同)

    producer event finished
    OrderHandler1 1,消费信息:1
    OrderHandler1 2,消费信息:1

    为何会出现三种不同的打印结果呢?

    原因在于,这段程序的运行过程中,存在三个线程:生产者线程、shutdown线程、消费者线程。三个线程的执行顺序不同,会导致结果产生不同的效果。按照线程执行的顺序不同,如下我们分别进行分析:

    1. 消费者线程->shutdown线程->生产者线程,输出结果1

    原因在于,调用shutdown线程时,消费者线程已经在运行。而此时生产者线程尚未填充event数据,在执行hasBackLog判断时,会发现消费者无数据可消费,因此直接关掉消费者线程,结束shutdown调用。最终生产者线程执行,放入event数据到RingBuffer中,程序运行结束。

    2. shutdown线程->生产者线程->消费者线程(或者,shutdown线程->消费者线程->生产者线程),输出结果2

    调用shutdown线程会给正在运行的消费者线程设置alert,但由于消费者线程尚未开启,所以此步骤跳过。随后消费者线程启动,线程持续执行。无论生产者、消费者线程执行的顺序如何,消息都最终可以被消费(只是输出的信息顺序不同),但最终消费者阻塞在等待RingBuffer的过程上。

    3. 生产者线程->消费者线程->shutdown线程,输出结果3

    生产者先生产好数据放入ringBuffer,随后消费者线程开启,shutdown线程发现消费者线程尚未消费完所有数据时,会在hasBackLog方法上循环等待。最终,消费者线程消费完数据,shutdown关闭,程序正常结束。

    4. 生产者线程->shutdown线程->消费者线程,输出结果2

    生产者生产好数据放入ringbuffer,shutdown取出此时正在运行的依赖尾节点消费者集合。但由于消费者线程尚未启动,此操作无效。随后消费者线程启动,消费数据集并阻塞

    5. 消费者线程->生产者线程->shutdown线程,输出结果3

    消费者线程运行并等待数据,生产者线程生产数据,shutdown线程等待消费者线程消费完毕,并关闭消费者线程。程序正常结束。

    总结

    由上可见,disruptor.shutdown方法仅仅能关闭当前已经启动了的消费者线程,对于调用时尚未启动的消费者线程不起作用。在disruptor.shutdown如果能正确的关闭程序,需要满足两个条件:

    1. 生产者的生产线程必须执行在disruptor.shutdown方法之前。

    2. disruptor.shutdown方法必须执行在所有消费者线程启动之前。

    在实际使用中,第二个条件产生的disruptor.shutdown失效问题也许并不常见。原因在于:线上环境中,生产者线程往往已经运行了一段时间,这段时间内,足够线程池调用所有的消费者线程并运行。但如果生产者运行的时间过短,导致shutdown提前调用在消费者线程启动之前,则会产生问题。

  • 相关阅读:
    【图片垂直居中】解决方案
    【透明、半透明】3中解决方案
    HTML5 Canvas 2D 绘图
    Java 连接 timesten
    使用Oracle SQL Developer连接timesten
    Python核心数据类型——文件
    Windows 下 Dropbox + Git 构建分布式多人协作版本控制系统
    Python核心数据类型——字典
    Python核心数据类型——元组
    Linux + Apache + MySQL 环境下OSQA部署
  • 原文地址:https://www.cnblogs.com/pku-liuqiang/p/8532737.html
Copyright © 2011-2022 走看看