zoukankan      html  css  js  c++  java
  • 【Disruptor】首个Disruptor的实例

    Disruptor核心使用与导航

    Disruptor快速开始

    建立一个工厂Event类,用于创建Event类实例对象

    我们先建立一个OrderEvent.java

    import javafx.event.Event;
    
    public class OrderEvent{
        private long value;//作为订单服务
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
    }
    

    然后再建立一个Event的工厂类,即OrderEventFactory.java

    
    import com.lmax.disruptor.EventFactory;
    
    public class OrderEventFactory implements EventFactory<OrderEvent> {
    
        @Override
        public OrderEvent newInstance() {
            return new OrderEvent();//为了返回空的数据对象
        }
    }
    

    建立一个监听事件类,用于处理数据(Event)类

    再建立一个OrderEventHandler.java 作为监听事件类

    import com.lmax.disruptor.EventHandler;
    public class OrderEventHandler implements EventHandler<OrderEvent> {
    
    
        @Override
        public void onEvent(OrderEvent event, long l, boolean b) throws Exception {
            System.out.println("消费者:"+ event.getValue());
        }
    }
    

    实例化Disruptor实例,配置一系列参数,编写Disruptor核心组件

    这里我们建立一个Main.java,作为主要的核心类

    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    import org.apache.ibatis.javassist.bytecode.analysis.Executor;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
        public static void main(String[] args) {
    
            OrderEventFactory orderEventFactory = new OrderEventFactory();
            int ringBufferSize = 1024*1024;
            ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            /**
             * 1 eventFacotry 消息工厂对象
             *  2 ringBufferSize 容器的长度
             *  3 executor 线程池
             *  4 ProducerType 生产这类型
             *  5 waitStartegy 等待策略
             */
            //1 实例化一个disruptor的对象
            Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
    
            // 2 添加消费者的监听(Disruptor 与消费者的一个关联关系)
            disruptor.handleEventsWith(new OrderEventHandler());
            // 3 启动disruptor
            disruptor.start();
        }
    }
    

    编写生产者组件,向Disruptor容器中去投递数据

    当然在进行下一步之前,我们会先需要一个OrderEventProducer.java 作为生产者

    import com.lmax.disruptor.RingBuffer;
    
    import java.nio.ByteBuffer;
    
    public class OrderEventProducer {
        private RingBuffer<OrderEvent> ringBuffer;
    
        public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void sendData(ByteBuffer data){
            long sequence = ringBuffer.next();
            try{
                //1. 在生产者发送消息的时候,首先需要从ringBuffer里面获取一个可用的序号
    
                //2. 根据这个序号 找到具体的“orderEvent”元素 此时获取的OrderEvent是一个空的对象
                OrderEvent event = ringBuffer.get(sequence);
                //3 进行实际的赋值操作
                event.setValue(data.getLong(0));
    
            }finally {
                // 4 提交操作
                ringBuffer.publish(sequence);
            }
    
    
    
        }
    }
    

    完成生产者之后,我们再把完善Main.java的语句

    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.nio.ByteBuffer;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
        public static void main(String[] args) {
    
            OrderEventFactory orderEventFactory = new OrderEventFactory();
            int ringBufferSize = 1024*1024;
            ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            /**
             * 1 eventFacotry 消息工厂对象
             *  2 ringBufferSize 容器的长度
             *  3 executor 线程池
             *  4 ProducerType 生产这类型
             *  5 waitStartegy 等待策略
             */
            //1 实例化一个disruptor的对象
            Disruptor<OrderEvent> disruptor = new Disruptor<>(orderEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
    
            // 2 添加消费者的监听(Disruptor 与消费者的一个关联关系)
            disruptor.handleEventsWith(new OrderEventHandler());
            // 3 启动disruptor
            disruptor.start();
    
            // 4 获取实际存储数据的容器 RingBuffer
            RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
    
    
            OrderEventProducer producer = new OrderEventProducer(ringBuffer);
            //生成一百个数 然后丢给producer
            ByteBuffer bb = ByteBuffer.allocate(8);
            for(long i = 0;i<100;i++){
                bb.putLong(0,i);
                producer.sendData(bb);
            }
            //关闭disruptor和线程池
            disruptor.shutdown();
            executor.shutdown();
        }
    }
    

    运行Main.java 得到:

    消费者:0
    消费者:1
    消费者:2
    消费者:3
    消费者:4
    消费者:5
    消费者:6
    消费者:7
    消费者:8
    消费者:9
    消费者:10
    消费者:11
    消费者:12
    消费者:13
    消费者:14
    消费者:15
    消费者:16
    消费者:17
    消费者:18
    消费者:19
    消费者:20
    消费者:21
    消费者:22
    消费者:23
    消费者:24
    消费者:25
    消费者:26
    消费者:27
    消费者:28
    消费者:29
    消费者:30
    消费者:31
    消费者:32
    消费者:33
    消费者:34
    消费者:35
    消费者:36
    消费者:37
    消费者:38
    消费者:39
    消费者:40
    消费者:41
    消费者:42
    消费者:43
    消费者:44
    消费者:45
    消费者:46
    消费者:47
    消费者:48
    消费者:49
    消费者:50
    消费者:51
    消费者:52
    消费者:53
    消费者:54
    消费者:55
    消费者:56
    消费者:57
    消费者:58
    消费者:59
    消费者:60
    消费者:61
    消费者:62
    消费者:63
    消费者:64
    消费者:65
    消费者:66
    消费者:67
    消费者:68
    消费者:69
    消费者:70
    消费者:71
    消费者:72
    消费者:73
    消费者:74
    消费者:75
    消费者:76
    消费者:77
    消费者:78
    消费者:79
    消费者:80
    消费者:81
    消费者:82
    消费者:83
    消费者:84
    消费者:85
    消费者:86
    消费者:87
    消费者:88
    消费者:89
    消费者:90
    消费者:91
    消费者:92
    消费者:93
    消费者:94
    消费者:95
    消费者:96
    消费者:97
    消费者:98
    消费者:99

    由此 Disruptor的首个开始测试完成。

    你好啊,谢谢你来看我。
  • 相关阅读:
    LeetCode20 有效的括号
    函数的多个参数
    定义一个函数的基本语法 函数的参数
    函数
    金字塔
    水仙花数
    百鸡百钱
    循环demo
    while适用于不确定循环次数
    浏览器打断点
  • 原文地址:https://www.cnblogs.com/sitr/p/13584257.html
Copyright © 2011-2022 走看看