zoukankan      html  css  js  c++  java
  • Disruptor快速入门

    JDK的多线程与并发库一文中, 提到了BlockingQueue实现了生产者-消费者模型

    BlockingQueue是基于锁实现的, 而锁的效率通常较低. 有没有使用CAS机制实现的生产者-消费者?

    Disruptor就是这样.

    disruptor使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue

    1.生产者-消费者

    1.1使用Disruptor类

    RingBuffer通过Disruptor实例获得

    public class Client {
    
        public static void main(String[] args) throws Exception {
            
            //1.配置并获得Disruptor
            ExecutorService  executor = Executors.newCachedThreadPool(); 
            LongEventFactory factory = new LongEventFactory();
            // 设置RingBuffer大小, 需为2的N次方(能将求模运算转为位运算提高效率 ), 否则影响性能
            int ringBufferSize = 1024 * 1024; 
    
            //创建disruptor, 泛型参数:传递的事件的类型
            // 第一个参数: 产生Event的工厂类, Event封装生成-消费的数据
            // 第二个参数: RingBuffer的缓冲区大小
            // 第三个参数: 线程池
            // 第四个参数: SINGLE单个生产者, MULTI多个生产者
            // 第五个参数: WaitStrategy 当消费者阻塞在SequenceBarrier上, 消费者如何等待的策略. 
                //BlockingWaitStrategy 使用锁和条件变量, 效率较低, 但CPU的消耗最小, 在不同部署环境下性能表现比较一致
                //SleepingWaitStrategy 多次循环尝试不成功后, 让出CPU, 等待下次调度; 多次调度后仍不成功, 睡眠纳秒级别的时间再尝试. 平衡了延迟和CPU资源占用, 但延迟不均匀.
                //YieldingWaitStrategy 多次循环尝试不成功后, 让出CPU, 等待下次调度. 平衡了延迟和CPU资源占用, 延迟也比较均匀.
                //BusySpinWaitStrategy 自旋等待,类似自旋锁. 低延迟但同时对CPU资源的占用也多.
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
            // 注册事件消费处理器, 也即消费者. 可传入多个EventHandler ...
            disruptor.handleEventsWith(new LongEventHandler());
            // 启动
            disruptor.start();
            
            //2.将数据装入RingBuffer
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            // 创建生产者, 以下方式一使用原始api, 方式二使用新API
            //LongEventProducer producer = new LongEventProducer(ringBuffer); 
            LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
            
            ByteBuffer byteBuffer = ByteBuffer.allocate(8); // 这里只是笔者实验, 不是必须要用ByteBuffer保存long数据
            for(int i = 0; i < 100; ++i){
                byteBuffer.putLong(0, i);
                producer.produceData(byteBuffer);
            }
    
            disruptor.shutdown(); //关闭 disruptor  阻塞直至所有事件都得到处理
            executor.shutdown(); // 需关闭 disruptor使用的线程池, 上一步disruptor关闭时不会连带关闭线程池        
        }
    }
    // Event封装要传递的数据 
    public class LongEvent { 
        private long value;
        public long getValue() { 
            return value; 
        } 
        public void setValue(long value) { 
            this.value = value; 
        } 
    } 
    // 产生Event的工厂
    public class LongEventFactory implements EventFactory { 
        @Override 
        public Object newInstance() { 
            return new LongEvent(); 
        } 
    } 
    public class LongEventHandler implements EventHandler<LongEvent>  {
        // 消费逻辑
        @Override
        public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
            System.out.println(longEvent.getValue());         
        }
    }
    //生产者实现一
    public class LongEventProducer {
        // 生产者持有RingBuffer的引用
        private final RingBuffer<LongEvent> ringBuffer;
        
        public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
            this.ringBuffer = ringBuffer;
        }
        
        public void produceData(ByteBuffer bb){
            // 获得下一个Event槽的下标
            long sequence = ringBuffer.next();
            try {
                // 给Event填充数据
                LongEvent event = ringBuffer.get(sequence);
                event.setValue(bb.getLong(0));
            } finally {
                // 发布Event, 激活观察者去消费, 将sequence传递给该消费者
                //publish应该放在 finally块中以确保一定会被调用->如果某个事件槽被获取但未提交, 将会堵塞后续的publish动作。
                ringBuffer.publish(sequence);
            }
        }
    }
    //生产者实现二
    public class LongEventProducerWithTranslator {
    
        // 使用EventTranslator, 封装 获取Event的过程
        private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
            @Override
            public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
                event.setValue(buffer.getLong(0));
            }
        };
        
        private final RingBuffer<LongEvent> ringBuffer;
        
        public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
        
        public void produceData(ByteBuffer buffer){
            // 发布
            ringBuffer.publishEvent(TRANSLATOR, buffer);
        }
    }

    1.2 直接使用RingBuffer

    给出了两种方式:EventProcessor与WorkPool(可处理多消费者)

    public class ClientForEventProcessor {  
       
        public static void main(String[] args) throws Exception {  
            int BUFFER_SIZE = 1024;
            int THREAD_NUMBERS = 4;
            
            // 这里直接获得RingBuffer. createSingleProducer创建一个单生产者的RingBuffer
            
            // 第一个参数为EventFactory,产生数据Trade的工厂类
            // 第二个参数是RingBuffer的大小,需为2的N次方     
            // 第三个参数是WaitStrategy, 消费者阻塞时如何等待生产者放入Event
            final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade(UUID.randomUUID().toString());
                }  
            }, BUFFER_SIZE, new YieldingWaitStrategy());  
            
            //SequenceBarrier, 协调消费者与生产者, 消费者链的先后顺序. 阻塞后面的消费者(没有Event可消费时)
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
              
            //创建事件处理器 (消费者): 处理ringBuffer, 用TradeHandler的方法处理(实现EventHandler), 用sequenceBarrier协调生成-消费
            //如果存在多个消费者(老api, 可用workpool解决) 那重复执行 创建事件处理器-注册进度-提交消费者的过程, 把其中TradeHandler换成其它消费者类  
            BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler());  
            //把消费者的消费进度情况注册给RingBuffer结构(生产者)    !如果只有一个消费者的情况可以省略 
            ringBuffer.addGatingSequences(transProcessor.getSequence());  
              
            //创建线程池  
            ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
            //把消费者提交到线程池, !说明EventProcessor实现了callable接口  
            executors.submit(transProcessor);  
            
            // 生产者, 这里新建线程不是必要的
            Future<?> future= executors.submit(new Callable<Void>() {  
                @Override  
                public Void call() throws Exception {  
                    long seq;  
                    for (int i = 0; i < 10; i++) {
                        seq = ringBuffer.next();
                        ringBuffer.get(seq).setPrice(Math.random() * 9999);
                        ringBuffer.publish(seq);
                    } 
                    return null;  
                }  
            }); 
    
            Thread.sleep(1000); //等上1秒,等待消费完成  
            transProcessor.halt(); //通知事件处理器  可以结束了(并不是马上结束!)  
            executors.shutdown(); 
        }  
    }  
    public class ClientForWorkPool {  
        public static void main(String[] args) throws InterruptedException {  
            int BUFFER_SIZE = 1024;
            int THREAD_NUMBERS = 4;
            
            RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
                public Trade newInstance() {  
                    return new Trade(UUID.randomUUID().toString());
                }  
            }, BUFFER_SIZE);  
           
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
            
            // 第三个参数: 异常处理器, 这里用ExceptionHandler; 第四个参数WorkHandler的实现类, 可为数组(即传入多个消费者)
            WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), new TradeHandler());  
              
            ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
            workerPool.start(executors);  
              
            // 生产10个数据
            for (int i = 0; i < 8; i++) {
                long seq = ringBuffer.next();
                ringBuffer.get(seq).setPrice(Math.random() * 9999);
                ringBuffer.publish(seq);
            }
              
            Thread.sleep(1000);  
            workerPool.halt();  
            executors.shutdown();  
        }  
    }  
    // 封装交易数据
    public class Trade {  
        private String id; // 订单ID  
        private String name;
        private double price; // 金额  
        
        public Trade(String id) {
            this.id = id;
        }
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
    } 
    // 消费者, 这里实现一个接口就行, 写两个是为了同时测试EventProcessor和WorkPool
    public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            this.onEvent(event);  
        }  
        @Override  
        public void onEvent(Trade event) throws Exception {  
            //具体的消费逻辑  
            System.out.println(event.getId());  
        }  
    }  

    1.3 多生产者-多消费者

    一个Event只能被某一个消费者处理

    public static void main(String[] args) throws Exception {
            //创建RingBuffer
            RingBuffer<Order> ringBuffer = 
                    RingBuffer.create(ProducerType.MULTI, 
                            new EventFactory<Order>() {  
                                @Override  
                                public Order newInstance() {  
                                    return new Order();  
                                }  
                            }, 
                            1024 * 1024, new YieldingWaitStrategy());
            
            SequenceBarrier barriers = ringBuffer.newBarrier();
            
            Consumer[] consumers = new Consumer[3];
            for(int i = 0; i < consumers.length; i++){
                consumers[i] = new Consumer("ct" + i);
            }
            // 3个消费者
            WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new MyExceptionHandler(), consumers);
            
            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
            ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            workerPool.start(executors);  
            // 10个生产者, 每个生成者生产20个数据
            for (int i = 0; i < 10; i++) {  
                final Producer p = new Producer(ringBuffer);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int j = 0; j < 2; j++){
                            p.produceData(UUID.randomUUID().toString());
                        }
                    }
                }).start();
            } 
            
            System.out.println("----开始生产----");
            Thread.sleep(1000);  // 等待消费完成
            System.out.println("总共消费数量:" + consumers[0].getCount() );
            
            workerPool.halt(); 
            executors.shutdown();
        }
        
        static class MyExceptionHandler implements ExceptionHandler {  
            public void handleEventException(Throwable ex, long sequence, Object event) {}  
            public void handleOnStartException(Throwable ex) {}  
            public void handleOnShutdownException(Throwable ex) {}  
        } 
    }
    public class Consumer implements WorkHandler<Order>{
        
        private String consumerId;
        // 消费计数器
        private static AtomicInteger count = new AtomicInteger(0);
        
        public Consumer(String consumerId){
            this.consumerId = consumerId;
        }
    
        @Override
        public void onEvent(Order order) throws Exception {
            System.out.println("当前消费者: " + this.consumerId + ", 消费信息: " + order.getId());
            count.incrementAndGet();
        }
        
        public int getCount(){
            return count.get();
        }
    }
    public class Producer {
    
        private final RingBuffer<Order> ringBuffer;
        public Producer(RingBuffer<Order> ringBuffer){
            this.ringBuffer = ringBuffer;
        }
        public void produceData(String data){
            long sequence = ringBuffer.next();
            try {
                Order order = ringBuffer.get(sequence);
                order.setId(data);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
    public class Order {  
        private String id;
        private String name;
        private double price;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
    }  

    2. 并行处理

    除了实现生产者-消费者模型, Disruptor还可以进行多路并行处理(一个Event可以进入多个路径同时进行处理, 因为不同路径操作的是同一个Event, 路径可以汇合)

    public class Client {  
        public static void main(String[] args) throws InterruptedException {  
           
            long beginTime=System.currentTimeMillis();  
            int bufferSize=1024;  
            ExecutorService executor=Executors.newFixedThreadPool(7);  // 注意: 线程数>=handler数+1
    
            Disruptor<Trade> disruptor = new Disruptor<Trade>(
                    new EventFactory<Trade>() {
                        @Override
                        public Trade newInstance() {
                            return new Trade(UUID.randomUUID().toString());
                        }
                    }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
            // 菱形操作   
            /*
            // 创建消费者组(含H1,H2)   H1,H2并行执行无先后顺序
            EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
            // C1,C2都完成后执行C3, 像JMS传递消息
            handlerGroup.then(new Handler3());
            */
            
            // 顺序操作
            /*
            disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());
            */
            
            // 六边形操作. H1, H4串行执行; H2, H5串行执行; 而H1,H4 与 H2,H5 并行执行
            Handler1 h1 = new Handler1();
            Handler2 h2 = new Handler2();
            Handler3 h3 = new Handler3();
            Handler4 h4 = new Handler4();
            Handler5 h5 = new Handler5();
            disruptor.handleEventsWith(h1, h2);
            disruptor.after(h1).handleEventsWith(h4);
            disruptor.after(h2).handleEventsWith(h5);
            disruptor.after(h4, h5).handleEventsWith(h3);
            
            disruptor.start(); 
    // 启动生产线程  
            executor.submit(new TradePublisher(disruptor));
            Thread.sleep(1000); // 等待消费完成
           
            disruptor.shutdown();  
            executor.shutdown();  
            System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
        }  
    }  
    public class TradePublisher implements Runnable {
    
        private Disruptor<Trade> disruptor;
    private static final int LOOP = 100;// 模拟百次交易的发生 public TradePublisher(Disruptor<Trade> disruptor) { this.disruptor = disruptor; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for (int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } } } class TradeEventTranslator implements EventTranslator<Trade> { private Random random = new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade) { trade.setPrice(random.nextDouble() * 9999); return trade; } }
    public class Handler1 implements EventHandler<Trade> {
        @Override
        public void onEvent(Trade event, long sequence, boolean endOfBatch)
                throws Exception {
            System.out.println("handler1: set name");
            event.setName("h1");
        }
    }
    public class Handler2 implements EventHandler<Trade> {  
        @Override  
        public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
            System.out.println("handler2: set price");
            event.setPrice(17.0);
        }  
    }  
    public class Handler3 implements EventHandler<Trade> {
        @Override  
        public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
            System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.getId());
        }  
    }
    public class Handler4 implements EventHandler<Trade> {  
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            System.out.println("handler4: append name");
            event.setName(event.getName() + "h4");
        }  
    }  
    public class Handler5 implements EventHandler<Trade> {    
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            System.out.println("handler5: add price");
            event.setPrice(event.getPrice() + 3.0);
        }  
    }  
  • 相关阅读:
    python---基础部分
    自动化测试---Selenium IDE安装及操作
    自动化测试---Selenium IDE概念
    自动化测试----概念
    jmeter---后端监听器
    jmeter---分布式测试
    jmeter---runtime控制器
    什么是 MyBatis 的接口绑定,有什么好处?
    接口绑定有几种实现方式,分别是怎么实现的?
    Apache Shiro 的三大核心组件
  • 原文地址:https://www.cnblogs.com/myJavaEE/p/6790917.html
Copyright © 2011-2022 走看看