zoukankan      html  css  js  c++  java
  • 使用Disruptor实现生产者和消费者模型

    生产者

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.RingBuffer;
    
    import java.nio.ByteBuffer;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:02 2018/5/7
     */
    public class Producer {
    
        //队列
        private final RingBuffer<Data> dataRingBuffer;
    
        public Producer(RingBuffer<Data> dataRingBuffer) {
            this.dataRingBuffer = dataRingBuffer;
        }
    
        /**
         * 插入数据
         * @param s
         */
        public void pushData(String s) {
    
            //获取下一个位置
            long next = dataRingBuffer.next();
            try {
                //获取容器
                Data data = dataRingBuffer.get(next);
                //设置数据
                data.setData(s);
            } finally {
                //插入
                dataRingBuffer.publish(next);
            }
        }
    }
    

      消费者

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.WorkHandler;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:01 2018/5/7
     */
    public class Customer implements WorkHandler<Data> {
        @Override
        public void onEvent(Data data) throws Exception {
            System.out.println(Thread.currentThread().getName()+"---"+data.getData());
        }
    }
    

      数据工厂

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.EventFactory;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:02 2018/5/7
     */
    public class DataFactory implements EventFactory<Data> {
    
        @Override
        public Data newInstance() {
            return new Data();
        }
    }
    

      主函数

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    
    import java.util.UUID;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:09 2018/5/7
     */
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            //创建线程池
            ExecutorService service = Executors.newCachedThreadPool();
            //创建数据工厂
            DataFactory dataFactory = new DataFactory();
            //设置缓冲区大小,必须为2的指数,否则会有异常
            int buffersize = 1024;
            Disruptor<Data> dataDisruptor = new Disruptor<Data>(dataFactory, buffersize,
                    service);
            //创建消费者线程
            dataDisruptor.handleEventsWithWorkerPool(
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer()
            );
            //启动
            dataDisruptor.start();
            //获取其队列
            RingBuffer<Data> ringBuffer = dataDisruptor.getRingBuffer();
            for (int i = 0; i < 100; i++) {
                //创建生产者
                Producer producer = new Producer(ringBuffer);
                //设置内容
                producer.pushData(UUID.randomUUID().toString());
                //Thread.sleep(1000);
            }
        }
    }
    

      其中策略有几种:

    1. BlockingWaitStrategy:阻塞策略,最节省CPU,但是高并发条件下性能最糟糕

    2 SleepingWaitStrategy:在循环中无限等待,处理数据会产生高延迟,对生产线程影响小,场景:异步日志

    3. YieldingWaitStrategy:低延迟场合,使用必须保证剩余的消费者线程的逻辑CPU

    4. BusySpinWaitStrategy:消费者线程会尽最大努力疯狂的监控缓冲区变化。

  • 相关阅读:
    WPF中的Command事件绑定
    WPF的EventAggregator的发布和订阅
    IE浏览器如何调试Asp.net的 js代码
    MVVM模式用依赖注入的方式配置ViewModel并注册消息
    SQL处理数组,字符串转换为数组
    C#在函数内部获取函数的参数
    JS判断字符串长度(中文长度为2,英文长度为1)
    .net一般处理程序(httphandler)实现文件下载功能
    SQL分页获取数据
    URI编码解码
  • 原文地址:https://www.cnblogs.com/lonecloud/p/9002927.html
Copyright © 2011-2022 走看看