zoukankan      html  css  js  c++  java
  • disruptor的并行用法

    实现EventFactory,在newInstance方法中返回,ringBuffer缓冲区中的对象实例;代码如下:

    public class DTaskFactory implements EventFactory<DTask> {
        @Override
        public DTask newInstance() {//disruptor使用环形缓冲区,这是环形缓冲区所承载的对象
            return new DTask();
        }
    }

    生产消费的对象类型:

    public class DTask {
        public String getName1() {
            return name1;
        }
    
        public void setName1(String name1) {
            this.name1 = name1;
        }
    
        public String getName2() {
            return name2;
        }
    
        public void setName2(String name2) {
            this.name2 = name2;
        }
    
        public String getName3() {
            return name3;
        }
    
        public void setName3(String name3) {
            this.name3 = name3;
        }
    
        String name1;
        String name2;
        String name3;
    
    }

    disruptor的消费处理事件onEvent为消费调用的方法(下面的代码中包含并行和串行执行的消费事件):

    public class DTaskHandle implements EventHandler<DTask> {
        @Override
        public void onEvent(DTask dTask, long l, boolean b) throws Exception {
            System.out.println("开始最后消费");
            System.out.println(dTask.getName1());
    
            System.out.println(dTask.getName2());
            System.out.println(dTask.getName3());
            System.out.println("结束最后消费");
        }
    }
    
    public class DTaskHandle1 implements EventHandler<DTask> {
        @Override
        public void onEvent(DTask dTask, long l, boolean b) throws Exception {
            System.out.println("-----DTaskHandle1-----");
            dTask.setName1("name1");
        }
    }
    
    public class DTaskHandle2 implements EventHandler<DTask> {
        @Override
        public void onEvent(DTask dTask, long l, boolean b) throws Exception {
            System.out.println("-----DTaskHandle2-----");
            dTask.setName2("name2");
        }
    }
    
    public class DTaskHandle3 implements EventHandler<DTask> {
        @Override
        public void onEvent(DTask dTask, long l, boolean b) throws Exception {
            System.out.println("-----DTaskHandle3-----");
            dTask.setName3("name3");
        }
    }

    测试执行类:

    public class DisruptorTest {
    
        public void exec() throws Exception {
            ExecutorService executor = Executors.newCachedThreadPool();
            Disruptor<DTask> disruptor = new Disruptor(new DTaskFactory(),
                    1024 * 1024,
                    executor,
                    ProducerType.SINGLE, new BusySpinWaitStrategy());
    
            DTaskHandle dTaskHandle = new DTaskHandle();
            DTaskHandle1 dTaskHandle1 = new DTaskHandle1();
            DTaskHandle2 dTaskHandle2 = new DTaskHandle2();
            DTaskHandle3 dTaskHandle3 = new DTaskHandle3();
            disruptor.handleEventsWith(dTaskHandle1, dTaskHandle2, dTaskHandle3);//消费生产出的对象,并行执行
    
            disruptor.after(dTaskHandle1, dTaskHandle2, dTaskHandle3).handleEventsWith(dTaskHandle);//并行执行1 2 3后,串行执行dTaskHandle
    
    //        disruptor.
    
            disruptor.start();
            CountDownLatch latch = new CountDownLatch(1);
            //生产者准备
            executor.submit(new TradePublisher(latch, disruptor));
    
            latch.await();//等待生产者完事.
    
            disruptor.shutdown();
            executor.shutdown();
        }
    
    }
  • 相关阅读:
    【leetcode】1230.Toss Strange Coins
    2018.12.25 SOW
    L203 词汇题
    L202
    L201
    L200
    2018
    2018.12.21 Cmos- RF
    L198
    L196 Hospital educations
  • 原文地址:https://www.cnblogs.com/zzq-include/p/6868874.html
Copyright © 2011-2022 走看看