zoukankan      html  css  js  c++  java
  • disruptor入门


    一、Disruptor的简介

      Disruptor是由LAMX(欧洲顶级金融公司)设计和开源的大规模、高并发、低延迟的异步处理框架,也可以说他
    是最快的消息框架(JMS)。整个业务逻辑处理器完全运行在内存中,其LMAX架构可以达到一个线程里每秒处理6百万
    流水,用1微秒的延迟可以获得100K+吞吐量的爆炸性能。非常适合那种实时性高、延迟率低、业务流水量大的应用场
    景,比如银行的实时交易处理、读写操作分离、数据缓存等。
      Disruptor是基于生产者-消费者模型,实现了"队列“功能的无锁高并发框架。他可以做到一个生产者对应多个消
    费者且消费者之间可以并行的处理,也可以形成先后顺序的处理。Disruptor本质上解决的就是在两个独立的处理过
    程之间交换数据。Disruptor框架的一些核心类有:
     1.Disruptor:用于控制整个消费者-生产者模型的处理器
       2.RingBuffer:用于存放数据
       3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。
       4.EventFactory:事件工厂类。
       5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。
       6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。
       7.用于运行disruptor的线程或者线程池。

    二、Disruptor的入门

      Disruptor的编写一般可以分为以下几步:
        (1)定义事件;
        (2)定义事件工厂;
        (3)消费者–定义事件处理的具体实现;
        (4)定义用于事件处理(消费者)的线程池;
        (5)指定等待策略:
          Disruptor 提供了多个WaitStrategy的实现,例如:BlockingWaitStrategy、SleepingWaitStrategy、
    YieldingWaitStrategy等:
          BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供
    更加一致的性能表现;
          SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其
    对生产者线程的影响最小,适合用于异步日志类似的场景;
          YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线
    数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
          WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
          WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
          WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        (6)生产(发布)消息;
        (7)关闭disruptor业务逻辑处理器;
      Disruptor的一些核心概念有:
        - Ring Buffer(环形缓冲区) :
        曾经RingBuffer是Disruptor中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过
    Disruptor进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定
    义实现来完全替代。
        - Sequence Disruptor :
        通过顺序递增的序号来编号管理。通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序
    号逐个递增处理。一个Sequence用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一
    个AtomicLong也可以用于标识进度,但定义Sequence来负责该问题还有另一个目的,那就是防止不同的 Sequence之间
    的CPU缓存伪共享(Flase Sharing)问题。
        - Sequencer :
        Sequencer是Disruptor的真正核心。此接口有两个实现类SingleProducerSequencer、MultiProducerSequencer
    ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
        - Sequence Barrier
        用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。
    Sequence Barrier 还定义了决定Consumer是否还有可处理的事件的逻辑。
        - Wait Strategy
        定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor定义了多种不同的策略,针对不同的场
    景,提供了不一样的性能表现)
        - Event
      在Disruptor的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被Disruptor
    定义的特定类型,而是由 Disruptor 的使用者定义并指定。
        - EventProcessor
        EventProcessor持有特定消费者的Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
        - EventHandler
        Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
        - Producer
        即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
      栗子:

      Event:

    /**
     * 事件(Event)就是通过 Disruptor 进行交换的数据类型。
     * @author lcy
     *
     */
    public class TransactionEvent {
        private long seq;
        private double amount;
        private long callNumber;
        
        public long getCallNumber() {
            return callNumber;
        }
        
        @Override
        public String toString() {
            return "TransactionEvent [seq=" + seq + ", amount=" + amount + ", callNumber=" + callNumber + "]";
        }
        
        public void setCallNumber(long callNumber) {
            this.callNumber = callNumber;
        }
        public long getSeq() {
            return seq;
        }
        public void setSeq(long seq) {
            this.seq = seq;
        }
        public double getAmount() {
            return amount;
        }
        public void setAmount(double amount) {
            this.amount = amount;
        }
    }

      Factory:

    /**
     * Event Factory 定义了如何实例化前面第1步中定义的事件(Event)
     * Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。 
            一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,
            然后往 Event 实例中填充数据,之后再发布到 RingBuffer中,之后由 Consumer 获得该 Event 实例并从中读取数据。
     * @author lcy
     *
     */
    public class TransactionEventFactory implements EventFactory<TransactionEvent>{
    
        @Override
        public TransactionEvent newInstance() {
            // TODO Auto-generated method stub
            return new TransactionEvent();
        }
    }

      Customer:

    /**
     * 事件处理类-交易流水初始化
     * @author lcy
     *
     */
    public class AmountTrasfer implements EventTranslator<TransactionEvent>{
    
        @Override
        public void translateTo(TransactionEvent arg0, long arg1) {
            arg0.setAmount(Math.random()*99);
            arg0.setCallNumber(17088888888L);
            arg0.setSeq(System.currentTimeMillis());
             System.out.println("设置交易流水:"+arg0.getSeq());
        }
    }
    /**
     * 消费者–定义事件处理的具体实现
     * 拦截交易流水
     * @author lcy
     *
     */
    public class TransHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{
    
        @Override
        public void onEvent(TransactionEvent transactionEvent) throws Exception {
             System.out.println("交易流水号为:"+transactionEvent.getSeq()+"||交易金额为:"+transactionEvent.getAmount());
        }
    
        @Override
        public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {
            // TODO Auto-generated method stub
            this.onEvent(arg0);
        }
    }
    /**
     * 发送验证短信
     * @author lcy
     *
     */
    public class SendMsgHandler implements EventHandler<TransactionEvent>{
    
        @Override
        public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {
            // TODO Auto-generated method stub
             System.out.println("向手机号:"+arg0.getCallNumber()+"发送验证短信......");
            
        }
    }
    /**
     * 交易流水入库操作
     * @author lcy
     *
     */
    public class InnerDBHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{
    
        @Override
        public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {
            // TODO Auto-generated method stub
            this.onEvent(arg0);
        }
    
        @Override
        public void onEvent(TransactionEvent arg0) throws Exception {
            arg0.setSeq(arg0.getSeq()*10000);
            System.out.println("拦截入库流水号------------  "+arg0.getSeq());
        }
    }

      Producer:

    /**
     * 生产者、发布事件
     * @author lcy
     *
     */
    public class TransactionEventProducer implements Runnable {
        // 线程同步辅助类 - 允许一个或多个线程一直等待
        CountDownLatch cdl;
        Disruptor disruptor;
    
        public TransactionEventProducer(CountDownLatch cdl, Disruptor disruptor) {
            super();
            this.cdl = cdl;
            this.disruptor = disruptor;
        }
    
        public TransactionEventProducer() {
            super();
            // TODO Auto-generated constructor stub
        }
    
    
        @Override
        public void run() {
            AmountTrasfer th;
            try {
                //Event对象初始化类
                th = new AmountTrasfer();
                //发布事件
                disruptor.publishEvent(th);
            } finally {
                // 递减锁存器的计数 -如果计数到达零,则释放所有等待的线程。
                cdl.countDown();
            }
        }
    
        // 定义环大小,2的倍数
        private static final int BUFFER_SIZE = 1024;
        // 定义处理事件的线程或线程池
        ExecutorService pool = Executors.newFixedThreadPool(7);
    
        /**
         * 批处理模式
         * @throws Exception
         */
        public void BatchDeal() throws Exception {
            //创建一个单生产者的ringBuffer
            final RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() {
    
                @Override
                public TransactionEvent newInstance() {
                    return new TransactionEvent();
                }
                //设置等待策略,YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。
            }, BUFFER_SIZE,new YieldingWaitStrategy());
            //创建SequenceBarrier
            SequenceBarrier barrier = ringBuffer.newBarrier();
            //创建消息处理器
            BatchEventProcessor<TransactionEvent> eventProcessor = new BatchEventProcessor<TransactionEvent>(ringBuffer,barrier,new InnerDBHandler());
            //构造反向依赖,eventProcessor之间没有依赖关系则可以将Sequence直接加入
            ringBuffer.addGatingSequences(eventProcessor.getSequence());
            //提交消息处理器
            pool.submit(eventProcessor);
            //提交一个有返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
            Future<Void> submit = pool.submit(new Callable<Void>() {
                //计算结果,如果无法计算结果则抛出异常
                @Override
                public Void call() throws Exception {
                    long seq;
                    for (int i=0;i<7000;i++) {
                        System.out.println("生产者:"+i);
                        //环里一个可用的区块
                        seq=ringBuffer.next();
                        //为环里的对象赋值
                        ringBuffer.get(seq).setAmount(Math.random()*10);
                        System.out.println("TransactionEvent:   "+ringBuffer.get(seq).toString());
                        //发布这个区块的数据,
                        ringBuffer.publish(seq);
                    }
                    return null;
                }
            });
            //等待计算完成,然后获取其结果。
            submit.get();
            Thread.sleep(1000);
            //关闭消息处理器
            eventProcessor.halt();
            //关闭线程池
            pool.shutdown();
        }
        
         /**
          * 工作池模式
          * @throws Exception
          */
        @SuppressWarnings("unchecked")
        public void poolDeal() throws Exception {
            RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() {
    
                @Override
                public TransactionEvent newInstance() {
                    return new TransactionEvent();
                }
            }, BUFFER_SIZE, new YieldingWaitStrategy());
            SequenceBarrier barrier = ringBuffer.newBarrier();
            //创建一个定长的线程池
            ExecutorService pool2 = Executors.newFixedThreadPool(5);
            //交易流水入库操作
            WorkHandler<TransactionEvent> innerDBHandler = new InnerDBHandler();
            ExceptionHandler arg2;
            WorkerPool<TransactionEvent> workerPool = new WorkerPool<TransactionEvent>(ringBuffer, barrier, new IgnoreExceptionHandler(), innerDBHandler);
            workerPool.start(pool2);
            long seq;
            for(int i =0;i<7;i++){
                seq = ringBuffer.next();
                ringBuffer.get(seq).setAmount(Math.random()*99);
                ringBuffer.publish(seq);
            }
            Thread.sleep(1000);
            workerPool.halt();
            pool2.shutdown();
        }
        
        /**
         * disruptor处理器用来组装生产者和消费者
         * @throws Exception 
         */
        @SuppressWarnings("unchecked")
        public void disruptorManage() throws Exception{
            //创建用于处理事件的线程池
            ExecutorService pool2 = Executors.newFixedThreadPool(7);
            //创建disruptor对象
            /**
             * 用来指定数据生成者有一个还是多个,有两个可选值ProducerType.SINGLE和ProducerType.MULTI
             * BusySpinWaitStrategy是一种延迟最低,最耗CPU的策略。通常用于消费线程数小于CPU数的场景
             */
            Disruptor<TransactionEvent> disruptor2 = new Disruptor<TransactionEvent>(new EventFactory<TransactionEvent>() {
    
                @Override
                public TransactionEvent newInstance() {
                    return new TransactionEvent();
                }                                  
            },BUFFER_SIZE,pool2,ProducerType.SINGLE,new BusySpinWaitStrategy());
            //创建消费者组,先执行拦截交易流水和入库操作
            EventHandlerGroup<TransactionEvent> eventsWith = disruptor2.handleEventsWith(new InnerDBHandler(),new TransHandler());
            //在进行风险交易的2次验证操作
            eventsWith.then(new SendMsgHandler());
            //启动disruptor
            disruptor2.start();
            //在线程能通过 await()之前,必须调用 countDown() 的次数
            CountDownLatch latch = new CountDownLatch(1);
            //将封装好的TransactionEventProducer类提交
            pool2.submit(new TransactionEventProducer(latch,disruptor2));
            //使当前线程在锁存器倒计数至零之前一直等待,以保证生产者任务完全消费掉
            latch.await();
            //关闭disruptor业务逻辑处理器
            disruptor2.shutdown();
            //销毁线程池
            pool2.shutdown();
        }
    }

      Test:

    /**
     * 测试类
     * @author lcy
     *
     */
    public class Test {
        public static void main(String[] args) throws Exception {
            TransactionEventProducer producer = new TransactionEventProducer();
            for (int i = 0; i < 100; i++)
                producer.disruptorManage();
            System.out.println("--------------------------------------------------");
        }
    }

    三、记一次生产上的BUG

      前段时间升级的时候出现了这样一个BUG,导致了近万用户的交易失败。首先确认了我们在生产上并没有部署拦
    截交易的规则,所有的交易流水都是放行的不会加入我们的风险名单库。那么内存库里的几万灰名单是怎么来的呢?
      我们在上线成功后需使用真实的用户进行一波生产上的测试,而在测试的过程中为了配合测试那边的需求,
    需将特定的几个测试账号配置成加入灰名单并进行2次认证的拦截规则。测试通过后便将那几条测试规则给撤回了。但
    是我们忽略了一个问题,因为Disruptor框架在初始化环的时候,只会new一次这个对象。这就导致了插入环里“槽”的对
    象始终都是第一次进入“灰名单”对象,等到环被塞满后下条流水进来的时候就会使用“槽”里的“灰名单”对象。即使这笔
    交易不是风险交易也会加入到灰名单中,导致了大量的用户交易失败。
      上线后的第二天,我们头儿就意识到了这个问题,想通过重启服务、清空环来暂时解决这个问题(服务器有负载均
    衡),因为环被清空后,之前在环里的“灰名单”对象也就不存在了,而且生产上没有部署将用户加入“灰名单”的规则,环
    里的对象就一定是“干净的”,这个问题也就得到了解决。但是、可是、可但是、万万没想到啊,当晚生产上还是出现了问题。
    灰名单里的用户数量已经逼近2万了,大量用户不能进行电子交易。
      为什么项目已经被重启了,环也被清空了,也没有规则会产生新的灰名单,那2万的灰名单用户是从哪来的?事后
    通过查看代码发现,虽然环被清空了,但是在清空之前已经有部分用户被存到了灰名单里。这些用户在某一时间再次
    进行交易时,会重新将这条交易的状态设置为“灰名单”(其他业务需要),这就导致了接待这条交易流水的“槽”会被重
    新赋值为“灰名单”的状态,然后环里的“灰名单”槽就会越滚越多。
      Event在使用的时候一定不要忘记将关键的属性进行初始化,这样才能保证从环里取出的对象是初始状态的,不会被上次处理的数据所影响。

  • 相关阅读:
    Codeforces Round #344 (Div. 2) C. Report 其他
    Codeforces Round #344 (Div. 2) B. Print Check 水题
    Codeforces Round #344 (Div. 2) A. Interview 水题
    8VC Venture Cup 2016
    CDOJ 1280 772002画马尾 每周一题 div1 矩阵快速幂 中二版
    CDOJ 1280 772002画马尾 每周一题 div1 矩阵快速幂
    CDOJ 1279 班委选举 每周一题 div2 暴力
    每周算法讲堂 快速幂
    8VC Venture Cup 2016
    Educational Codeforces Round 9 F. Magic Matrix 最小生成树
  • 原文地址:https://www.cnblogs.com/0813lichenyu/p/9244410.html
Copyright © 2011-2022 走看看