zoukankan      html  css  js  c++  java
  • Java深入学习(6):Disruptor

    Disruptor框架简介:

    并发框架,基于事件驱动,使用观察者模式

    底层采用环形数组,取模算法

    简单使用:

    /**
     * 声明一个Event:表示生产者和消费者之间传递的数据类型
     */
    public class LongEvent {
    
        private Long value;
    
        public Long getValue() {
            return value;
        }
    
        public void setValue(Long value) {
            this.value = value;
        }
    
    }

    工厂:

    /**
     * 实例化
     */
    public class LongEventFactory implements EventFactory<LongEvent> {
    
        public LongEvent newInstance() {
            return new LongEvent();
        }
    
    }

    消费者:

    /**
     * 消费者
     */
    public class LongEventHandler implements EventHandler<LongEvent> {
    
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("消费者获取数据:"+event.getValue());
        }
    
    }

    生产者:

    /**
     * 生产者
     */
    public class LongEventProducer {
    
        private RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(ByteBuffer byteBuffer) {
            //获取事件队列的下标位置
            long sequence = ringBuffer.next();
            try {
                //取出空队列
                LongEvent longEvent = ringBuffer.get(sequence);
                //给空队列赋值
                longEvent.setValue(byteBuffer.getLong(0));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("生产者发送数据");
                //发送数据
                ringBuffer.publish(sequence);
            }
        }
    
    }

    启动:

    public class Main {
    
        public static void main(String[] args) {
            //创建可缓存线程池
            ExecutorService executor = Executors.newCachedThreadPool();
            //创建工厂
            EventFactory<LongEvent> factory = new LongEventFactory();
            //创建RingBuffer(必须为2的N次方)
            int ringBuffer = 1024 * 1024;
            //创建Disruptor
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
                    factory,
                    ringBuffer,
                    executor,
                    ProducerType.MULTI,
                    new YieldingWaitStrategy()
            );
            //注册消费者(如果注册多个消费者默认是重复消费)
            disruptor.handleEventsWith(new LongEventHandler());
            //启动
            disruptor.start();
            //创建RingBuffer容器
            RingBuffer<LongEvent> buffer = disruptor.getRingBuffer();
            //创建生产者
            LongEventProducer longEventProducer = new LongEventProducer(buffer);
            //定义大小为8的缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(8);
            for (int i = 0; i < 100; i++) {
                byteBuffer.putLong(0, i);
                longEventProducer.onData(byteBuffer);
            }
            executor.shutdown();
            disruptor.shutdown();
        }
    
    }

    打印如下:

    ....................
    消费者获取数据:39
    生产者发送数据
    消费者获取数据:40
    生产者发送数据
    消费者获取数据:41
    生产者发送数据
    消费者获取数据:42
    ....................
  • 相关阅读:
    How to function call using 'this' inside forEach loop
    jquery.validate.unobtrusive not working with dynamic injected elements
    Difference between jQuery.extend and jQuery.fn.extend?
    Methods, Computed, and Watchers in Vue.js
    Caution using watchers for objects in Vue
    How to Watch Deep Data Structures in Vue (Arrays and Objects)
    Page: DOMContentLoaded, load, beforeunload, unload
    linux bridge
    linux bridge
    EVE-NG网卡桥接
  • 原文地址:https://www.cnblogs.com/xuyiqing/p/11666577.html
Copyright © 2011-2022 走看看