zoukankan      html  css  js  c++  java
  • Disruptor之粗糙认识

    一 概述

    1.Disruptor

    Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型。

    2.RingBuffer

    RingBuffer是一种环形数据结构,包含一个指向下一个槽点的序号,可以在线程间传递数据。

    3.Event

    在Disruptor框架中,生产者生产的数据叫做Event。

    二 Disruptor框架基本构成

    1.MyEvent:自定义对象,充当“生产者-消费者”模型中的数据。
    2.MyEventFactory:实现EventFactory的接口,用于生产数据。
    3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布。
    4.MyEventHandler:自定义消费者。

    三 Demo

    初次接触Disruptor,认识停留在表面,零散,模糊,在此记一个简单的示例,以便日后深入研究。

    1.自定义数据类 

    package com.disruptor.basic;
    
    public class LongEvent {
        private long value;
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
    
    }

    2.数据生产工厂(创建数据类对象)

    package com.disruptor.basic;
    
    import com.lmax.disruptor.EventFactory;
    
    public class LongEventFactory implements EventFactory<LongEvent> {
    
        public LongEvent newInstance() {
            // TODO Auto-generated method stub
            return new LongEvent();
        }
    
    }

    3.数据源(初始化数据对象并发布)

    package com.disruptor.basic;
    
    import java.nio.ByteBuffer;
    
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;
    
    public class LongEventProducerWithTranslator {
    
        private final RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
            /**
             * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
             * bb:包含有将要被存储到目标对象中的数据的容器
             */
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
                // TODO Auto-generated method stub
                event.setValue(bb.getLong(0));// 将数据存储到目标对象中
            }
        };
    
        public void onData(ByteBuffer bb) {
            ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者
        }
    
    }

    4.消费者

    package com.disruptor.basic;
    
    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler implements EventHandler<LongEvent> {
    
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("当前消费的数据="+event.getValue());
        }
    
    }

    5.测试类

    package com.disruptor.basic;
    
    import java.nio.ByteBuffer;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.junit.Test;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class LongEventTest {
    
        @SuppressWarnings({ "unchecked", "deprecation" })
        @Test
        public void test01() throws InterruptedException {
            ExecutorService executor = Executors.newCachedThreadPool();
            EventFactory<LongEvent> factory = new LongEventFactory();
            int bufferSize = 1024;
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
            disruptor.handleEventsWith(new LongEventHandler());
            disruptor.start();
    
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            // LongEventProducer producer = new
            // LongEventProducer(ringBuffer);
            LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
            ByteBuffer bb = ByteBuffer.allocate(8);
            // long startTime = System.currentTimeMillis();
            for (long a = 0; a < 100; a++) {
                bb.putLong(0, a);
                producer.onData(bb);
                /*if (a == 99) {
                    long endTime = System.currentTimeMillis();
                    System.out.println("useTime=" + (endTime - startTime));
                }*/
                Thread.sleep(100);
            }
            /*long endTime = System.currentTimeMillis();
            System.out.println("useTime=" + (endTime - startTime));*/
            disruptor.shutdown();
            executor.shutdown();
        }
    
        /*@Test
        public void test02() {
            long startTime = System.currentTimeMillis();
            for (long a = 0; a < 100; a++) {
                System.out.println(a);
            }
            long endTime = System.currentTimeMillis();
            System.out.println("useTime=" + (endTime - startTime));
        }*/
    
    }

     

  • 相关阅读:
    POJ 3255 Roadblocks
    KMP算法的前缀next数组最通俗的解释
    HDU 1829 A Bug's Life
    HDU 1879 继续畅通工程
    课程设计:学生管理系统(c++)
    HDU 1016 Prime Ring Problem
    HDU 4310 Hero
    素数筛选法<单向链表实现>
    【未完】训练赛20190304:KMP+树状数组+线段树+优先队列
    畅通工程:并查集入门裸题
  • 原文地址:https://www.cnblogs.com/tonghun/p/7077233.html
Copyright © 2011-2022 走看看