zoukankan      html  css  js  c++  java
  • Java深入学习(6):Disruptor

    Disruptor框架简介:

    并发框架,基于事件驱动,使用观察者模式

    底层采用环形数组,取模算法

    简单使用:

    /**
     * 声明一个Event:表示生产者和消费者之间传递的数据类型
     */
    public class LongEvent {
    
        private Long value;
    
        public Long getValue() {
            return value;
        }
    
        public void setValue(Long value) {
            this.value = value;
        }
    
    }

    工厂:

    /**
     * 实例化
     */
    public class LongEventFactory implements EventFactory<LongEvent> {
    
        public LongEvent newInstance() {
            return new LongEvent();
        }
    
    }

    消费者:

    /**
     * 消费者
     */
    public class LongEventHandler implements EventHandler<LongEvent> {
    
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("消费者获取数据:"+event.getValue());
        }
    
    }

    生产者:

    /**
     * 生产者
     */
    public class LongEventProducer {
    
        private RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(ByteBuffer byteBuffer) {
            //获取事件队列的下标位置
            long sequence = ringBuffer.next();
            try {
                //取出空队列
                LongEvent longEvent = ringBuffer.get(sequence);
                //给空队列赋值
                longEvent.setValue(byteBuffer.getLong(0));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("生产者发送数据");
                //发送数据
                ringBuffer.publish(sequence);
            }
        }
    
    }

    启动:

    public class Main {
    
        public static void main(String[] args) {
            //创建可缓存线程池
            ExecutorService executor = Executors.newCachedThreadPool();
            //创建工厂
            EventFactory<LongEvent> factory = new LongEventFactory();
            //创建RingBuffer(必须为2的N次方)
            int ringBuffer = 1024 * 1024;
            //创建Disruptor
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
                    factory,
                    ringBuffer,
                    executor,
                    ProducerType.MULTI,
                    new YieldingWaitStrategy()
            );
            //注册消费者(如果注册多个消费者默认是重复消费)
            disruptor.handleEventsWith(new LongEventHandler());
            //启动
            disruptor.start();
            //创建RingBuffer容器
            RingBuffer<LongEvent> buffer = disruptor.getRingBuffer();
            //创建生产者
            LongEventProducer longEventProducer = new LongEventProducer(buffer);
            //定义大小为8的缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(8);
            for (int i = 0; i < 100; i++) {
                byteBuffer.putLong(0, i);
                longEventProducer.onData(byteBuffer);
            }
            executor.shutdown();
            disruptor.shutdown();
        }
    
    }

    打印如下:

    ....................
    消费者获取数据:39
    生产者发送数据
    消费者获取数据:40
    生产者发送数据
    消费者获取数据:41
    生产者发送数据
    消费者获取数据:42
    ....................
  • 相关阅读:
    Angular与PHP之间的不同的请求方式(post/get)导致的传参问题
    项目当中会使用到的---获取主机名
    JavaScript eval() 函数
    PHP 数组排序
    PHP Switch 语句
    PHP strlen()函数和strpos()函数
    Array.prototype.map()和Array.prototypefilter()
    25.参考链接
    24.ArrayBuffer
    23.读懂 ECMAScript 规格
  • 原文地址:https://www.cnblogs.com/xuyiqing/p/11666577.html
Copyright © 2011-2022 走看看