zoukankan      html  css  js  c++  java
  • Disruptor使用简介

    disruptor是lmax公司开发的一款java高性能并发框架,其本质是一种类似队列的实现“生产者—消费者 ”模式的组件。

     下面是其示例代码:

    public class DisruptorServer {
        private Disruptor disruptor = null;
    
        public static void main(String[] args) {
            DisruptorContext.start();
            System.out.println("Disruptor服务已启动...");
           
            for(long i=0; i<101; i++){
                DisruptorContext.publish(i);
            }
            DisruptorContext.stop();
            System.out.println("...Disruptor服务已停止");
        }
    }
    
    public class DisruptorContext {
        private static Disruptor<LongEvent> disruptor = null;
        private static ExecutorService executor = null;
        
        public static void start(){
            if(null==disruptor){
            EventFactory<LongEvent> eventFactory = new LongEventFactory();
            executor = Executors.newSingleThreadExecutor();
            WaitStrategy waitStrategy = new BlockingWaitStrategy();
            int ringBufferSize = 1024*1024;
            disruptor = new Disruptor<LongEvent>(eventFactory, 
                                                 ringBufferSize, 
                                                 executor, 
                                                 ProducerType.SINGLE, 
                                                 waitStrategy);
            EventHandler<LongEvent> eventHandler = new LongEventHandler();
            disruptor.handleEventsWith(eventHandler);
            disruptor.start();
            }
        }
        
        public static void stop(){
            disruptor.shutdown();
            executor.shutdown();
        }
        
        public static void publish(long eventData){
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            long sequence = ringBuffer.next();
            try{
            LongEvent event = ringBuffer.get(sequence);
            event.set(eventData);
            }finally{
                ringBuffer.publish(sequence);
            }
        }
    }
    
    public class LongEvent {
        private long value;
    
        public void set(long value) {
            this.value = value;
        }
        
        public long get(){
            return this.value;
        }
    }
    
    public class LongEventFactory implements EventFactory<LongEvent> {
    
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    
    }
    
    public class LongEventHandler implements EventHandler<LongEvent>{
    
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("Disruptor消费者输出Event :" + event.get());
        }
    
    }

    从构造函数来看一下disruptor的几个组成部分:

    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, waitStrategy);

    Event 需要进入disruptor交换的对象都需要封装成event,本例中封装的是一个long

    EventFactory   工厂,决定以何种方式创建event。  
      *   工厂模式:我不知道你需要的是什么样的对象,索性把你的构造方法(工厂)传过来吧。

    EventHandler  事件处理的具体实现,也即producer——consumer中的consumer的具体实现
      *   本例中仅仅对event中的long进行输出

    ringBuffer  存放event的地方, 生产者和消费者之间的缓冲,相当于一个无锁的环形队列

    executor    jdk5 java.concurrent.*包里的线程池 ,用来执行消费者线程

    ProducerType.SINGLE  说明这个disruptor是单生产者模式的(disruptor也支持多生产者模式)

    waitStrategy   消费者的等待策略, 有blocking、sleeping、yielding三种

    关于多个消费者

    ExecutorService executor executor = Executors.newCachedThreadPool(); WaitStrategy yieldStrategy = new YieldingWaitStrategy(); Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,                                              ringBufferSize,                                              executor,                                              ProducerType.SINGLE,                                              yieldStrategy);                                              如上这种情况下,在EventHandler中让每个event处理sleep 1秒,设置2个EventHandler(2个消费者),最后执行disruptor 向其publish 100个event,最后执行了大约100秒。

    可见,消费者与消费者之间是并行处理的,但单个消费者内部的100个事件来说是以同步方式处理的。

  • 相关阅读:
    iOS 笔记-AFN使用中的遇到的问题
    iOS 笔记-incompatible pointer types initializing 'NSMutableString *' with an expression of type 'NSString *'警告处理
    iOS 笔记-SRT视频字幕的解析与同步
    iOS-笔记 字符编码
    iOS-笔记 设置导航栏的样式
    iOS 笔记-自定义的导航栏按钮
    iOS 笔记-删除插件的方法
    iOS 笔记-关于用户交互的那些事
    iOS BUG整理--[__NSCFNumber length]: unrecognized selector sent to instance 崩溃解决
    iOS BUG整理-Data argument not used by format string的警告处理
  • 原文地址:https://www.cnblogs.com/lyhero11/p/5124844.html
Copyright © 2011-2022 走看看