zoukankan      html  css  js  c++  java
  • EventProcessor与WorkPool用法--可处理多消费者

    单一的生产者,消费者有多个,使用WorkerPool来管理多个消费者;

    RingBuffer在生产Sequencer中记录一个cursor,追踪生产者生产到的最新位置,通过WorkSequence和sequence记录整个workpool消费的位置和每个WorkProcessor消费到位置,来协调生产和消费程序

    1、定义事件

    package com.ljq.disruptor;
    
    import java.io.Serializable;
    
    /**
     * 交易事件数据
     * 
     * @author Administrator
     *
     */
    @SuppressWarnings("serial")
    public class TradeEvent implements Serializable {
        private String id; // 订单ID
        private String name;
        private double price; // 金额
    
        public TradeEvent() {
        }
        
        public TradeEvent(String id) {
            super();
            this.id = id;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public double getPrice() {
            return price;
        }
    
        public void setPrice(double price) {
            this.price = price;
        }
    
        @Override
        public String toString() {
            return "Trade [id=" + id + ", name=" + name + ", price=" + price + "]";
        }
    
    }

    2、TradeEvent事件消费者

    package com.ljq.disruptor;
    
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    
    public class TradeEventHandler implements EventHandler<TradeEvent>, WorkHandler<TradeEvent> {
        @Override
        public void onEvent(TradeEvent event, long sequence, boolean endOfBatch) throws Exception {
            this.onEvent(event);
        }
    
        /**
         * WorkProcessor多线程排队领event然后再执行,不同线程执行不同的event。但是多了个排队领event的过程,这个是为了减少对生产者队列查询的压力
         */
        @Override
        public void onEvent(TradeEvent event) throws Exception {
            // 具体的消费逻辑
            System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event);
        }
    }

    3、EventProcessor消费者-生产者启动类

    package com.ljq.disruptor;
    
    import java.util.UUID;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.lmax.disruptor.BatchEventProcessor;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.YieldingWaitStrategy;
    
    public class EventProcessorMain {
        public static void main(String[] args) throws Exception {  
            long beginTime = System.currentTimeMillis();
            
            // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
            int bufferSize = 1024;
            //固定线程数
            int nThreads = 4;
            
            EventFactory<TradeEvent> eventFactory = new EventFactory<TradeEvent>() {  
                @Override  
                public TradeEvent newInstance() {  
                    return new TradeEvent(UUID.randomUUID().toString());
                }  
            };
            
            //RingBuffer. createSingleProducer创建一个单生产者的RingBuffer
            //第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 
            //第二个参数是RingBuffer的大小,它必须是2的整数倍,目的是为了将求模运算转为&运算提高效率
            //第三个参数是RingBuffer的生产在没有可用区块的时候(可能是消费者太慢了)的等待策略 
            final RingBuffer<TradeEvent> ringBuffer = RingBuffer.createSingleProducer(eventFactory, bufferSize, new YieldingWaitStrategy());  
            
            //SequenceBarrier, 协调消费者与生产者, 消费者链的先后顺序. 阻塞后面的消费者(没有Event可消费时)
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
              
            //创建消费者事件处理器, 多线程并发执行,不同线程执行不同的event 
            BatchEventProcessor<TradeEvent> transProcessor = new BatchEventProcessor<TradeEvent>(ringBuffer, sequenceBarrier, new TradeEventHandler());  
            //把消费者的消费进度情况注册给RingBuffer结构(生产者),如果只有一个消费者的情况可以省略 
            ringBuffer.addGatingSequences(transProcessor.getSequence());  
              
            //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程
            ExecutorService executors = Executors.newFixedThreadPool(nThreads);  
            //把消费者提交到线程池,说明EventProcessor实现了callable接口  
            executors.submit(transProcessor);  
            
            // 生产者,这里新建线程不是必要的
            Future<?> future= executors.submit(new Callable<Void>() {  
                @Override  
                public Void call() throws Exception {  
                    long seq;  
                    for (int i = 0; i < 100000; i++) {
                        seq = ringBuffer.next();
                        ringBuffer.get(seq).setPrice(i);
                        ringBuffer.publish(seq);
                    } 
                    return null;  
                }  
            }); 
            future.get();//等待生产者结束  
            
            Thread.sleep(1000); //等上1秒,等消费都处理完成
            transProcessor.halt(); //通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) 
            executors.shutdown(); 
            
            System.out.println(String.format("总共耗时%s毫秒", (System.currentTimeMillis() - beginTime)));
    
        }  
    }

    4、WorkerPool消费者-生产者启动类

    package com.ljq.disruptor;
    
    import java.util.UUID;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.IgnoreExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.WorkerPool;
    
    public class WorkPoolMain {
        public static void main(String[] args) throws InterruptedException {
            // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
            int bufferSize = 1024;
            //固定线程数
            int nThreads = 4;
    
            //RingBuffer. createSingleProducer创建一个单生产者的RingBuffer
            RingBuffer<TradeEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeEvent>() {
                public TradeEvent newInstance() {
                    return new TradeEvent(UUID.randomUUID().toString());
                }
            }, bufferSize);
    
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    
            WorkerPool<TradeEvent> workerPool = new WorkerPool<TradeEvent>(ringBuffer, sequenceBarrier,
                    new IgnoreExceptionHandler(), new TradeEventHandler());
    
            //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程
            ExecutorService executors = Executors.newFixedThreadPool(nThreads);
            workerPool.start(executors);
    
            // 生产10个数据
            for (int i = 0; i < 80000; i++) {
                long seq = ringBuffer.next();
                ringBuffer.get(seq).setPrice(i);
                ringBuffer.publish(seq);
            }
    
            Thread.sleep(1000); //等上1秒,等消费都处理完成
            workerPool.halt(); //通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) 
            executors.shutdown(); 
        }
    }
  • 相关阅读:
    Node 文件上传,ZIP
    jquery实现前台倒计时。应用下单24小时后自动取消该订单
    solr 4.4添加索引是新手容易遇到的问题
    solr 4.6的安装配置
    java.lang.OutOfMemoryError: PermGen space
    java.sql.SQLException: Access denied for user 'root'@'localhost' (using password: NO)
    mybatis代码生成(generator工具生成代码)
    再次熟悉jdbc连接mysql
    魔方阵,奇数阵。输入一个奇数,产生一个魔方阵
    错误,这个如何解决呢?内存溢出的问提。把JAVA_OPTS="-server -XX:PermSize=64M -XX:MaxPermSize=128m 还是不行
  • 原文地址:https://www.cnblogs.com/linjiqin/p/7439986.html
Copyright © 2011-2022 走看看