zoukankan      html  css  js  c++  java
  • disruptor架构三 使用场景 使用WorkHandler和BatchEventProcessor辅助创建消费者

    在helloWorld的实例中,我们创建Disruptor实例,然后调用getRingBuffer方法去获取RingBuffer,其实在很多时候,我们可以直接使用RingBuffer,以及其他的API操作。我们一起熟悉下示例:

    使用EventProcessor消息处理器。

    BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event 

    EventProcessor有3个实现类

    BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event

    使用BatchEventProcessor 消费者需要实现EventHandler接口

    我们来看下面的代码:

    需要处理的实体类

    package bhz.generate1;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Trade {  
        
        private String id;//ID  
        private String name;
        private double price;//金额  
        private AtomicInteger count = new AtomicInteger(0);
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
        public AtomicInteger getCount() {
            return count;
        }
        public void setCount(AtomicInteger count) {
            this.count = count;
        } 
          
          
    }  

    消费者类:

    package bhz.generate1;
    
    import java.util.UUID;
    
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    
    public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
          
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            this.onEvent(event);  
        }  
      
        @Override  
        public void onEvent(Trade event) throws Exception {  
            //杩欓噷鍋氬叿浣撶殑娑堣垂閫昏緫  
            event.setId(UUID.randomUUID().toString());//绠�崟鐢熸垚涓婭D  
            System.out.println(event.getId());  
        }  
    }  

    消费者除了实现EventHandler接口之外,还实现了WorkHandler接口,为啥了,因为后面我们要使用了WokerPool来发送该实体类,所以这里就让该实体类实现两个接口

    我们来看看main方法

    package bhz.generate1;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.lmax.disruptor.BatchEventProcessor;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.EventProcessor;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.YieldingWaitStrategy;
    
    public class Main1 {  
       
        public static void main(String[] args) throws Exception {  
            int BUFFER_SIZE=1024;  
            int THREAD_NUMBERS=4;  
            /* 
             * createSingleProducer创建一个单生产者的RingBuffer, 
             * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 
             * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 
             * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 
             */  
            final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, BUFFER_SIZE, new YieldingWaitStrategy());  
            
            //创建线程池  
            ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
            
            //创建SequenceBarrier  ,用于平衡生产者和消费者速率,用障碍来处理
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
              
            //创建消息处理器  
            BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                    ringBuffer, sequenceBarrier, new TradeHandler());  
              
            //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 ,将生产者和消费者关联起来
            ringBuffer.addGatingSequences(transProcessor.getSequence());  
              
            //把消息处理器提交到线程池  
            executors.submit(transProcessor);  
            
            //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
              
            Future<?> future= executors.submit(new Callable<Void>() {  
                @Override  
                public Void call() throws Exception {  
                    long seq;  
                    for(int i=0;i<10;i++){  
                        seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
                        ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 
                        ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
                    }  
                    return null;  
                }  
            }); 
            
            future.get();//等待生产者结束  
            Thread.sleep(1000);//等上1秒,等消费都处理完成  
            transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
            executors.shutdown();//终止线程  
        }  
    }  

    //创建消息处理器
    BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
    ringBuffer, sequenceBarrier, new TradeHandler());

    它主要有三个成员RingBuffer、SequenceBarrier和EventHandler

    上面对应对应的是一个生产者,一个消费者的情况

    我们来看看程序运行的效果

    1a7226d0-e212-4183-b109-cab5e5c41545
    3e1da0fa-686d-4361-bea2-600c2c5d26b9
    bf31874a-3405-4008-80e7-03caf9f16ae4
    080a05ef-0052-4271-a2ee-ee50038a5a77
    71e1a5a8-24ba-4175-b53a-f8b71e99464a
    99670de9-6aa5-48fa-8fa2-a490250e25ba
    7a44b351-0caa-4ac3-b344-97cf72c9dd5f
    10a7fe52-eef1-453c-80a2-126fd8bac948
    c78f2ed5-3c3e-4481-9062-dd96ff7ba051
    49f51ad6-2ee5-4c36-a0d0-96bc0e17fba9

    如果是一个生产者,对应多个消费者,那么

    //创建消息处理器
    BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
    ringBuffer, sequenceBarrier, new TradeHandler());

    //这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略 ,将生产者和消费者关联起来
    ringBuffer.addGatingSequences(transProcessor.getSequence());

    //把消息处理器提交到线程池
    executors.submit(transProcessor);

    //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类

    所以:BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event 

    2、使用WorkerPool消息处理器。

    消费者需要实现:WorkHandler接口

    我们来看看主程序的代码:

    package bhz.generate1;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.IgnoreExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.WorkHandler;
    import com.lmax.disruptor.WorkerPool;
    
    public class Main2 {  
        public static void main(String[] args) throws InterruptedException {  
            int BUFFER_SIZE=1024;  
            int THREAD_NUMBERS=4;  
            
            EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            };  
            
            RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
              
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
              
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
              
            WorkHandler<Trade> handler = new TradeHandler();  
    
            WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
              
            workerPool.start(executor);  
              
            //下面这个生产8个数据
            for(int i=0;i<8;i++){  
                long seq=ringBuffer.next();  
                ringBuffer.get(seq).setPrice(Math.random()*9999);  
                ringBuffer.publish(seq);  
            }           
            Thread.sleep(1000);  
            workerPool.halt();  
            executor.shutdown();  
        }  
    }  

    程序运行的效果:

    4bbffa55-b19f-44a4-bfa7-100affc63323
    121a0ee8-7e8e-4637-b659-ca78ae9aaa20
    0fc1cdb8-8186-44fc-a3a5-4bf5fea66086
    afb70a80-e1ce-46f9-bfc1-4e0d81be96b4
    0e0b3690-830b-4d38-b78b-e0930b499515
    f5b4e23f-10c8-45ea-b064-32ae40f54912
    4a172494-480a-4509-99d0-d416b5e2c5c9
    902c0669-6196-423e-9924-31cb9633bbb5

  • 相关阅读:
    路由系统
    flask_sqlalchemy的使用
    input()输入语句
    注释
    Python 2017.1.5
    关于object网页播放器参数的设置,推荐博客系列
    LRU缓存,大神写的,值得借鉴
    object,网页播放器的相关属性设置
    js的apply和call方法
    count()函数在count()中参数的讨论
  • 原文地址:https://www.cnblogs.com/kebibuluan/p/7655876.html
Copyright © 2011-2022 走看看