zoukankan      html  css  js  c++  java
  • Disruptor

    九、 Disruptor

    数据的内存结构只有数组和链表,线程安全的非阻塞队列,链表实现有ConcurrentLinkedQueue,但是却没有数组的实现,因为数组的扩张需要创建新的数组并复制元素,效率非常低。

    Disruptor是使用数组实现的,内部使用的RingBuffer。特性有:高并发,无锁,直接覆盖旧的数据(降低GC频率),是基于事件的生产者消费者模式实现。

    Disruptor的使用

    • 事件:向disruptor数组中存放的元素。
    • 事件工厂:用来生成事件对象。disruptor为了提升效率,会在初始化时就向数组中提前初始化填充好每一个位置的元素,这样当生产者进来只需要覆盖当前位置元素的属性,避免创建对象,降低GC频率,也就是复用对象。
    • 事件处理器:消费事件对象。

    定义事件

    package com.xiazhi.disrupter;
    
    /**
     * @author 赵帅
     * @date 2021/1/22
     */
    public class LongEvent {
    
        private long value;
    
        public LongEvent() {
        }
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return "LongEvent{" +
                    "value=" + value +
                    '}';
        }
    }
    

    定义事件工厂

    package com.xiazhi.disrupter;
    
    import com.lmax.disruptor.EventFactory;
    
    /**
     * @author 赵帅
     * @date 2021/1/22
     */
    public class LongEventFactory implements EventFactory<LongEvent> {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }
    

    定义消费者

    package com.xiazhi.disrupter;
    
    import com.lmax.disruptor.EventHandler;
    
    /**
     * @author 赵帅
     * @date 2021/1/22
     */
    public class LongEventPolicy implements EventHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
            System.out.println(String.format("[%s]:value=%d:sequence=%d:b=%s", Thread.currentThread().getName(), longEvent.getValue(), sequence, endOfBatch));
        }
    }
    

    使用Disruptor

    package com.xiazhi.disrupter;
    
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    
    import java.util.concurrent.Executors;
    
    /**
     * @author 赵帅
     * @date 2021/1/22
     */
    public class DisruptorDemo {
    
        public static void main(String[] args) {
            // 创建disruptor容器
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(), 8, Executors.defaultThreadFactory());
            // 设置消费者,当有多个消费者时传多个参数,每一个消费者就是一个线程
            // disruptor.handleEventsWith(new LongEventPolicy(), new LongEventPolicy());
            disruptor.handleEventsWith(new LongEventPolicy());
            // 开始工作
            disruptor.start();
    
            // 获取ringBuffer
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            for (int i = 0; i <= 10; i++) {
                // 获取下一个可用的序列号
                long sequence = ringBuffer.next();
                try {
                    // 获取当前位置元素
                    LongEvent longEvent = ringBuffer.get(sequence);
                    // 覆盖属性
                    longEvent.setValue(100L);
                } finally {
                    //发布此位置事件
                    ringBuffer.publish(sequence);
                }
            }
            // 关闭容器
            disruptor.shutdown();
        }
    }
    

    Disruptor的工作原理

    disruptor的是基于事件实现的,那么就有了生产者(provider)和消费者(consumer)存在,生产者生产元素放入数组中,消费者从数组中消费元素,这个数组就是RingBuffer。每一个生产者和消费者内部都会有一个私有指针pri-sequence,表示当前操作的元素序号,同时RingBuffer内部也会有一个全局指针global-sequence指向最后一个可以被消费的元素。这样当生产者需要放数据时,只需要获取global-sequence的下一个位置,下一个位置如果还未被消费,那么就会进入等待策略,如果下一个位置已经被消费,那么就会直接覆盖当前位置的属性值。

    当生产者需要向容器中存放数据时,只需要使用sequence%(数组长度-1)就可以得到要添加的元素应该放在哪儿个位置上,这样就实现了数组的首尾相连。

    disruptor初始化时需要指定容器大小,容器大小指定为2^n,计算时可以可以使用位运算:

    如果容器大小是8,要放12号元素。12%8 = 12 &(8-1)=1100&0111=0100=4。

    使用位运算可以提升效率。

    disruptor的8种等待策略:

    disruptor使用waitStrategy接口来实现等待策略,当ringbuffer满了的时候,就会调用等待策略。内置实现了8种等待策略:

    • (常用)BlockingWaitStrategy: 阻塞等待策略
    • BusySpinWaitStrategy: 线程一直自旋等待,可能比较耗费cpu。
    • LiteBlockingWaitStrategy: 线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.
    • LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。
    • PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略
    • TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常
    • (常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu。
    • (常用)SleepingWaitStrategy : 睡眠等待。

    消费者处理异常

    当消费者处理出现异常时,可以通过设置异常处理器来处理异常信息。异常处理器可以通过实现:

    package com.xiazhi.disrupter;
    
    import com.lmax.disruptor.ExceptionHandler;
    
    /**
     * @author 赵帅
     * @date 2021/1/23
     */
    public class SimpleLongEventExceptionPolicy implements ExceptionHandler<LongEvent> {
        @Override
        public void handleEventException(Throwable throwable, long l, LongEvent longEvent) {
            throwable.printStackTrace();
            System.out.println(longEvent);
        }
    
        @Override
        public void handleOnStartException(Throwable throwable) {
            System.out.println("容器启动异常");
            throwable.printStackTrace();
        }
    
        @Override
        public void handleOnShutdownException(Throwable throwable) {
            System.out.println("关闭异常");
            throwable.printStackTrace();
        }
    }
    

    Disruptor容器异常处理器的设置有两种方式:

    • 设置默认异常处理器:disruptor.setDefaultExceptionHandler(exceptionPolicy)

      SimpleLongEventExceptionPolicy exceptionPolicy = new SimpleLongEventExceptionPolicy();
      disruptor.setDefaultExceptionHandler(exceptionPolicy);
      
    • 覆盖异常处理器:disruptor.handleExceptionsFor(eventPolicy).with(exceptionPolicy);

      disruptor.handleEventsWith(eventPolicy);
      SimpleLongEventExceptionPolicy exceptionPolicy = new SimpleLongEventExceptionPolicy();
      disruptor.handleExceptionsFor(eventPolicy).with(exceptionPolicy);
      

    生产者类型

    disruptor为了提升效率,还可以再初始化时配置生产者类型,如果生产者是单线程的,那么就再创建时指定生产者类型为单线程,那么就不用加锁操作,效率会再提升。

    Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, 
            64, 
            Executors.defaultThreadFactory(), 
            ProducerType.SINGLE,
            new BlockingWaitStrategy());
    

    ProducerType是一个枚举,取值有:

    • SINGLE: 单线程
    • MULTI:多线程(默认)

    Disruptor对java8lambda的支持

    disruptor为了支持java8的lambda,新增了函数式接口:EventTranslator

    可以设置一个参数的:EventTranslatorOneArg,

    可以设置两个参数的:EventTranslatorTowArg

    可以设置三个参数的:EventTranslatorThreeArg

    以及可以设置多个参数的:EventTranslatorVararg

    因此可以使用如下方式配置disruptor:

    package com.xiazhi.disrupter;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.util.Arrays;
    import java.util.concurrent.Executors;
    
    /**
     * @author 赵帅
     * @date 2021/1/23
     */
    public class DisruptorLambdaDemo {
        public static void main(String[] args) {
    
            Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,
                    64,
                    Executors.defaultThreadFactory(),
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
    
            disruptor.handleEventsWith(new LongEventPolicy());
            disruptor.setDefaultExceptionHandler(new SimpleLongEventExceptionPolicy());
            disruptor.start();
    
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    
            // 使用lambda发布事件
            for (int i = 0; i < 10; i++) {
                // 不设置参数的
                ringBuffer.publishEvent((longEvent, sequence) -> longEvent.setValue(100L));
                // 设置一个参数的
                ringBuffer.publishEvent((longEvent, sequence, arg) -> longEvent.setValue(arg), i);
                // 设置两个参数的
                ringBuffer.publishEvent((longEvent, sequence, var1, var2) -> {
                    System.out.println(String.format("var1 = %s, var2 = %s", var1, var2));
                    longEvent.setValue(var1);
                }, i, "hello world");
                // 设置三个参数的
                ringBuffer.publishEvent((longEvent, sequence, var1, var2, var3) -> {
                    System.out.println(String.format("var1 = %s, var2 = %s, var3 = %s", var1, var2, var3));
                    longEvent.setValue(var1);
                }, i, "hello world", "arg3");
                // 设置多个参数的
                ringBuffer.publishEvent((longEvent, sequence, vars) -> {
                    System.out.println("args = " + Arrays.toString(vars));
                    longEvent.setValue((Integer) vars[0]);
                }, i, "hello world", "arg3");
            }
            disruptor.shutdown();
        }
    }
  • 相关阅读:
    网页加速的14条优化法则 网站开发与优化
    .NET在后置代码中输入JS提示语句(背景不会变白)
    C语言变量声明内存分配
    SQL Server Hosting Toolkit
    An established connection was aborted by the software in your host machine
    C语言程序设计 2009春季考试时间和地点
    C语言程序设计 函数递归调用示例
    让.Net 程序脱离.net framework框架运行
    C语言程序设计 答疑安排(2009春季 110周) 有变动
    软件测试技术,软件项目管理 实验时间安排 2009春季
  • 原文地址:https://www.cnblogs.com/Zs-book1/p/14318987.html
Copyright © 2011-2022 走看看