zoukankan      html  css  js  c++  java
  • Disruptor的使用

    ..................2015年的第一天...................

    本文代码托管在 https://github.com/hupengcool/disruptor-starter

    Intruduction

    关于吹牛逼的话就不说了。。。Disruptor是Java实现的用于线程间通信的消息组件。其核心是一个Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。与大部分并发队列使用的Lock相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不需要像Lock一样需要OS的支持,所以每次调用不需要kernel entry,也不需要context switch。当然,使用CAS的代价是Disruptor实现的复杂程度也相对提高了。

    Component

    Sequence

    Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

    RingBuffer

    RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。

    SequenceBarrier

    SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。

    SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

    WaitStrategy

    当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

    BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。

    BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。

    SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。

    YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。

    PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

    BatchEvenProcessor

    在Disruptor中,消费者是以EventProcessor的形式存在的。其中一类消费者是BatchEvenProcessor。每个BatchEvenProcessor有一个Sequence,来记录自己消费RingBuffer中消息的情况。所以,一个消息必然会被每一个BatchEvenProcessor消费。

    WorkProcessor

    另一类消费者是WorkProcessor。每个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其他WorkProcessor消费。这个被WorkProcessor共享的Sequence相当于尾指针。

    WorkerPool

    共享同一个Sequence的WorkProcessor可由一个WorkerPool管理,这时,共享的Sequence也由WorkerPool创建。

    Use Cases

    下面以Disruptor 3.3.0版本为例介绍Disruptor的初级使用,本文并没有用那些比较原始的API,如果想知道上面写的一些api如何使用,可以参考 https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor 为了简化使用,框架提供Disruptor类来简化使用,下面主要是使用这个类来演示。
    首先定义一个Event:

    /**
     * Created by hupeng on 2015/1/1.
     */
    public class MyEvent {
    
        private long value;
    
        public void setValue(long value) {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return "MyEvent{" +
                    "value=" + value +
                    '}';
        }
    }
    
    

    然后提供一个EventFactory,RingBuffer通过这factory来初始化在Event。

    import com.lmax.disruptor.EventFactory;
    
    /**
     * Created by hupeng on 2015/1/1.
     */
    public class MyEventFactory implements EventFactory<MyEvent> {
        @Override
        public MyEvent newInstance() {
            return new MyEvent();
        }
    }
    

    然后写一个Producer类,也就是消息的生产者。

    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;
    
    /**
     * Created by hupeng on 2015/1/1.
     */
    public class MyEventProducer {
    
        private RingBuffer<MyEvent> ringBuffer;
    
        public MyEventProducer(RingBuffer<MyEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg<MyEvent, Long>() {
    
            @Override
            public void translateTo(MyEvent event, long sequence, Long value) {
                event.setValue(value);
            }
        };
        
        public void onData(final Long value) {
            ringBuffer.publishEvent(TRANSLATOR,value);
        }
    }
    

    然后写一个EventHandler。这个就是我们定义怎么处理消息的地方。

    import com.lmax.disruptor.EventHandler;
    
    /**
     * Created by hupeng on 2015/1/1.
     */
    public class MyEventHandler implements EventHandler<MyEvent> {
        @Override
        public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println(event);
        }
    }
    
    

    主程序:

    import com.lmax.disruptor.IgnoreExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    import disruptor.starter.support.*;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    public class MyEventMain {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            int bufferSize = 1024;
    
            Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
                    bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
            disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
    
            disruptor.handleEventsWith(new MyEventHandler(),new MyEventHandler());
    //        disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler());  //Pipeline
            RingBuffer<MyEvent> ringBuffer = disruptor.start();
    
            MyEventProducer producer = new MyEventProducer(ringBuffer);
            for (long i = 0; i < 10; i++) {
                producer.onData(i);
                Thread.sleep(1000);// wait for task execute....
            }
    
            disruptor.shutdown();
    
            ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);
        }
    }
    
    

    在这个例子中输出

    MyEvent{value=0}
    MyEvent{value=0}
    MyEvent{value=1}
    MyEvent{value=1}
    MyEvent{value=2}
    MyEvent{value=2}
    MyEvent{value=3}
    MyEvent{value=3}
    MyEvent{value=4}
    MyEvent{value=4}
    MyEvent{value=5}
    MyEvent{value=5}
    MyEvent{value=6}
    MyEvent{value=6}
    MyEvent{value=7}
    MyEvent{value=7}
    MyEvent{value=8}
    MyEvent{value=8}
    MyEvent{value=9}
    MyEvent{value=9}
    

    可以看出每个MyEventHandler(implements EventHandler)都会处理同一条消息。另外我们还可以使用类似:

    disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler())
    

    这样的方法来定义依赖关系,比如先执行哪个handler再执行哪个handler。其他比如and()详情见api
    如果我们想定义多个handler,但是同时只有一个handler处理某一条消息。可以实现WorkHandler来定义handler:

    import com.lmax.disruptor.WorkHandler;
    
    /**
     * Created by hupeng on 2015/1/1.
     */
    public class MyEventWorkHandler implements WorkHandler<MyEvent> {
    
        private String workerName;
    
        public MyEventWorkHandler(String workerName) {
            this.workerName = workerName;
        }
    
        @Override
        public void onEvent(MyEvent event) throws Exception {
            System.out.println(workerName + " handle event:" + event);
        }
    }
    

    这时候我们改一下我们的主程序:

    public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            int bufferSize = 1024;
    
            Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(),
                    bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
            disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
            disruptor.handleEventsWithWorkerPool(new MyEventWorkHandler("worker-1"),new MyEventWorkHandler("worker-2"));
            RingBuffer<MyEvent> ringBuffer = disruptor.start();
    
            MyEventProducer producer = new MyEventProducer(ringBuffer);
            for (long i = 0; i < 10; i++) {
                producer.onData(i);
                Thread.sleep(1000);// wait for task execute....
            }
    
            disruptor.shutdown();
    
            ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS);
    
        }
    

    这时候我们可以看到输出是这样的:

    worker-1 handle event:MyEvent{value=0}
    worker-2 handle event:MyEvent{value=1}
    worker-1 handle event:MyEvent{value=2}
    worker-2 handle event:MyEvent{value=3}
    worker-1 handle event:MyEvent{value=4}
    worker-2 handle event:MyEvent{value=5}
    worker-1 handle event:MyEvent{value=6}
    worker-2 handle event:MyEvent{value=7}
    worker-1 handle event:MyEvent{value=8}
    worker-2 handle event:MyEvent{value=9}
    

    一条消息只被一个handler处理。

    这里的ExecutorsUtils就是写的一个关闭ExecutorService的方法

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class ExecutorsUtils {
    
        public static  void shutdownAndAwaitTermination(ExecutorService pool,int timeout,TimeUnit unit) {
            pool.shutdown(); // Disable new tasks from being submitted
            try {
                // Wait a while for existing tasks to terminate
                if (!pool.awaitTermination(timeout/2, unit)) {
                    pool.shutdownNow(); // Cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if (!pool.awaitTermination(timeout/2, unit))
                        System.err.println("Pool did not terminate");
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                pool.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
    
    

    概念部分来自http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html ,如果想对这个框架有更一步了解,可以点进去看看,可以参考源代码。

  • 相关阅读:
    Title
    Title
    Title
    Title
    Title
    Title
    Title
    get和post两种表单提交方式的区别
    计算机网络体系结构补充内容
    计算机网络体系结构作业题整理-第十章答案
  • 原文地址:https://www.cnblogs.com/hupengcool/p/4196965.html
Copyright © 2011-2022 走看看