zoukankan      html  css  js  c++  java
  • Disruptor的简单介绍与应用

    前言

    最近工作比较忙,在工作项目中,看了很多人都自己实现了一套数据任务处理机制,个人感觉有点乱,且也方便他人的后续维护,所以想到了一种数据处理模式,即生产者、缓冲队列、消费者的模式来统一大家的实现逻辑。

    下面时是对Disruptor基本使用的演示。使用中需要引入依赖

    <dependency>
      <groupId>com.lmax</groupId>
      <artifactId>disruptor</artifactId>
      <version>3.4.2</version>
    </dependency>
    

    名称解释

    • Ring Buffer

      环境的缓存区,3.0版本以前被认为是Disruptor的主要成员。3.0版本以后,环形缓冲区只负责通过Disruptor的事件方式来对数据进行存储和更新。在一些高级的应用场景中,Ring Buffer可以由用户的自定义实现完全替代。

    • Sequence

      Disruptor使用Sequence作为一种方法来确定特定组件的位置。每个使用者(EventProcessor)与Disruptor本身一样维护一个序列。大多数并发代码依赖于这些序列值的移动,因此序列支持AtomicLong的许多当前特性。事实上,两者之间唯一真正的区别是序列包含额外的功能,以防止序列和其他值之间的错误共享。

    • Sequencer

      Sequencer是真正的核心,该接口的两个实现(单生产者, 多消费者)实现了所有用于在生产者和使用者之间的快速、正确的传递数据的并发算法。

    • Sequence Barrier

      序列屏障由Sequencer产生,包含对Sequencer和任何依赖消费者的序列的引用。它包含确定是否有任何事件可供使用者处理的逻辑。

    • Wait Strategy

      等待策略确定消费者将如何等待生产者产生的消息,Disruptor将消息放到事件(Event)中。

    • Event

      从生产者到消费者的数据单位。不存在完全由用户定义的事件的特定代码的表示形式。

    • EventProcessor

      EventProcessor持有特定消费者(Consumer)的Sequence,并提供用于调用事件处理实现的事件循环。

    • BatchEventProcessor

      BatchEventProcessor它包含事件循环的有效实现,并将回调到已使用的EventHandle接口实现。

    • EventHandler

      Disruptor定义的事件处理接口,由用户实现,用于处理事件,是Consumer的真正实现。

    • Producer

      生产者,只是泛指调用Disruptor发布事件的用户代码,Disruptor没有定义特定接口或类型。

    架构图

    简单实用Disruptor

    1 定义事件

    事件就是通过Disruptor进行交换的数据类型。

    package com.disruptor;
    
    public class Data {
    
        private long value;
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
    }
    

    2 定义事件工厂

    事件工厂定义了如何实例化第一步中定义的事件。Disruptor通过EventFactory在RingBuffer中预创建Event的实例。

    一个Event实例被用作一个数据槽,发布者发布前,先从RingBuffer获得一个Event的实例,然后往Event实例中插入数据,然后再发布到RingBuffer中,最后由Consumer获得Event实例并从中读取数据。

    package com.disruptor;
    
    import com.lmax.disruptor.EventFactory;
    
    public class DataFactory implements EventFactory<Data> {
    
        @Override
        public Data newInstance() {
            return new Data();
        }
    }
    

    3 定义生产者

    package com.disruptor;
    
    import com.lmax.disruptor.RingBuffer;
    
    import java.nio.ByteBuffer;
    
    public class Producer {
    
        private final RingBuffer<Data> ringBuffer;
    
        public Producer(RingBuffer<Data> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void pushData(ByteBuffer byteBuffer) {
            long sequence = ringBuffer.next();
    
            try {
                Data even = ringBuffer.get(sequence);
                even.setValue(byteBuffer.getLong(0));
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
    
    

    4 定义消费者

    package com.disruptor;
    
    import com.lmax.disruptor.WorkHandler;
    
    import java.text.MessageFormat;
    
    
    public class Consumer implements WorkHandler<Data> {
    
        @Override
        public void onEvent(Data data) throws Exception {
            long result = data.getValue() + 1;
    
            System.out.println(MessageFormat.format("Data process : {0} + 1 = {1}", data.getValue(), result));
        }
    }
    
    

    5 启动Disruptor

    • 测试Demo
    package com.disruptor;
    
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    
    import java.nio.ByteBuffer;
    import java.util.concurrent.ThreadFactory;
    
    
    public class Main {
    
        private static final int NUMS = 10;
    
        private static final int SUM = 1000000;
    
        public static void main(String[] args) {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            long start = System.currentTimeMillis();
    
            DataFactory factory = new DataFactory();
    
            int buffersize = 1024;
    
            Disruptor<Data> disruptor = new Disruptor<Data>(factory, buffersize, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r);
                }
            });
    
            Consumer[] consumers = new Consumer[NUMS];
            for (int i = 0; i < NUMS; i++) {
                consumers[i] = new Consumer();
            }
    
            disruptor.handleEventsWithWorkerPool(consumers);
            disruptor.start();
    
            RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
            Producer producer = new Producer(ringBuffer);
    
            ByteBuffer bb = ByteBuffer.allocate(8);
            for (long i = 0; i < SUM; i++) {
                bb.putLong(0, i);
                producer.pushData(bb);
                System.out.println("Success producer data : " + i);
            }
            long end = System.currentTimeMillis();
    
            disruptor.shutdown();
    
            System.out.println("Total time : " + (end - start));
        }
    }
    
    
    • 结果(部分结果展示)
    Data process : 999,987 + 1 = 999,988
    Success producer data : 999995
    Data process : 999,990 + 1 = 999,991
    Data process : 999,989 + 1 = 999,990
    Data process : 999,991 + 1 = 999,992
    Data process : 999,992 + 1 = 999,993
    Data process : 999,993 + 1 = 999,994
    Data process : 999,995 + 1 = 999,996
    Success producer data : 999996
    Success producer data : 999997
    Success producer data : 999998
    Success producer data : 999999
    Data process : 999,994 + 1 = 999,995
    Data process : 999,996 + 1 = 999,997
    Data process : 999,997 + 1 = 999,998
    Data process : 999,998 + 1 = 999,999
    Data process : 999,999 + 1 = 1,000,000
    Total time : 14202
    

    由结果展示可见,边生产、边消费。

    彩蛋

    1 事件转换类

    package com.mm.demo.disruptor.translator;
    
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.mm.demo.disruptor.entity.Data;
    
    public class DataEventTranslator implements EventTranslatorOneArg<Data, Long> {
    
        @Override
        public void translateTo(Data event, long sequence, Long arg0) {
            System.out.println(MessageFormat.format("DataEventTranslator arg0 = {0}, seq = {1}", arg0, sequence));
            event.setValue(arg0);
        }
    }
    

    2 消费者

    2.1 消费者Demo1

    消费者每次将event的结果加1。

    package com.mm.demo.disruptor.handler;
    
    import com.lmax.disruptor.EventHandler;
    import com.mm.demo.disruptor.entity.Data;
    
    import java.text.MessageFormat;
    
    public class D1DataEventHandler implements EventHandler<Data> {
    
        @Override
        public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
            long result = event.getValue() + 1;
            Thread t = new Thread();
            String name = t.getName();
            System.out.println(MessageFormat.format("consumer "+name+": {0} + 1 = {1}", event.getValue(), result));
        }
    
    }
    

    这里是使用的是EventHandler。也是使用WorkHandler,EventHandler和WorkHandler的区别是前者不需要池化,后者需要池化。

    2.2 消费者Demo2

    package com.mm.demo.disruptor.handler;
    
    import com.lmax.disruptor.EventHandler;
    import com.mm.demo.disruptor.entity.Data;
    
    import java.text.MessageFormat;
    
    
    public class D2DataEventHandler implements EventHandler<Data> {
    
        @Override
        public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
            long result = event.getValue() + 2;
            System.out.println(MessageFormat.format("consumer 2: {0} + 2 = {1}", event.getValue(), result));
        }
    }
    
    

    2.3 串行依次计算

    Consumer1执行完成再执行Consumer2。

    package com.mm.demo.disruptor.process;
    
    import com.lmax.disruptor.dsl.Disruptor;
    import com.mm.demo.disruptor.entity.Data;
    import com.mm.demo.disruptor.handler.D1DataEventHandler;
    import com.mm.demo.disruptor.handler.D2DataEventHandler;
    
    /**
     * 串行依次计算
     * @DateT: 2020-01-07
     */
    public class Serial {
    
        public static void serial(Disruptor<Data> disruptor) {
            disruptor.handleEventsWith(new D1DataEventHandler()).then(new D2DataEventHandler());
            disruptor.start();
        }
    }
    
    

    2.4 并行实时计算

    Consumer1和Consumer2同时执行。

    package com.mm.demo.disruptor.process;
    
    import com.lmax.disruptor.dsl.Disruptor;
    import com.mm.demo.disruptor.entity.Data;
    import com.mm.demo.disruptor.handler.D1DataEventHandler;
    import com.mm.demo.disruptor.handler.D2DataEventHandler;
    
    /**
     * 并行执行
     * @DateT: 2020-01-07
     */
    public class Parallel {
    
        public static void parallel(Disruptor<Data> dataDisruptor) {
            dataDisruptor.handleEventsWith(new D1DataEventHandler(), new D2DataEventHandler());
            dataDisruptor.start();
        }
    }
    

    2.5 测试类

    package com.mm.demo.disruptor;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    import com.mm.demo.disruptor.entity.Data;
    import com.mm.demo.disruptor.handler.D1DataEventHandler;
    import com.mm.demo.disruptor.process.Parallel;
    import com.mm.demo.disruptor.process.Serial;
    import com.mm.demo.disruptor.translator.DataEventTranslator;
    
    import javax.swing.plaf.synth.SynthTextAreaUI;
    import java.nio.ByteBuffer;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    
    public class Main {
    
        private static final int BUFFER = 1024 * 1024;
    
        public static void main(String[] args) {
    
            DataFactory factory = new DataFactory();
    
            Disruptor<Data> disruptor = new Disruptor<Data>(factory, BUFFER, Executors.defaultThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
    
          
            Serial.serial(disruptor);
    //        Parallel.parallel(disruptor);
    
            RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
            for (int i = 0; i < 2; i++) {
                ringBuffer.publishEvent(new DataEventTranslator(), (long)i);
            }
            disruptor.shutdown();
        }
    }
    

    总结

    上边只演示了串行和并行的方式,其实还是通过组合的方式创建不的计算处理方式(需要创建多个事件处理器EventHandler)。

    补充等待策略

    • BlockingWaitStrategy:最低效的策略,但是对cpu的消耗是最小的,在各种不同部署环境中能提供更加一致的性能表现。
    • SleepingWaitStrategy:性能和BlockingWaitStrategy差不多少,cpu消耗也类似,但是其对生产者线程的影响最小,适合用于异步处理数据的场景。
    • YieldingWaitStrategy:性能是最好的,适用于低延迟的场景。在要求极高性能且事件处理线程数小于cpu处理核数时推荐使用此策略。
    • BusySpinWaitStrategy:低延迟,但是对cpu资源的占用较多。
    • PhasedBackoffWaitStrategy:上边几种策略的综合体,延迟大,但是占用cpu资源较少。

    参考

    本文参考了Disruptor源码以及github中的部分说明。

    Demo源码地址

    github


    • 写作不易,转载请注明出处,喜欢的小伙伴可以关注公众号查看更多喜欢的文章。
    • 联系方式:4272231@163.com
    • QQ:95472323
    • 微信:ffj2000
  • 相关阅读:
    请问,如何在windows系统下面同时使用中文和英文的cmd?_百度知道
    boost计算随机数和计算crc32简单示例
    使用 boost 进行 CRC32 校验
    常见的Hash算法
    省了一笔钱,关点被爱国者无线硬盘打动入手了
    夏新大V二代,外型不错,屏幕偏小,电池偏小,质感上还可以提升两个level,把整个品味提一提,再把价格降到999,内置软件上再下点功夫就不错了
    藏红花
    我来自台湾:普力600 消毒锭_普力-600 消毒片PL-600 SH-XT-K礼盒小套组_易迅发现_易迅网
    我来自台湾:普力600 消毒锭_普力-600 消毒片PL-600 SH-XT-K礼盒小套组_易迅发现_易迅网
    中继联通chinaunicom热点。求路由推荐。-无线路由器-中国无线论坛
  • 原文地址:https://www.cnblogs.com/fengfujie/p/12163895.html
Copyright © 2011-2022 走看看