zoukankan      html  css  js  c++  java
  • 使用Disruptor实现生产者和消费者模型

    生产者

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.RingBuffer;
    
    import java.nio.ByteBuffer;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:02 2018/5/7
     */
    public class Producer {
    
        //队列
        private final RingBuffer<Data> dataRingBuffer;
    
        public Producer(RingBuffer<Data> dataRingBuffer) {
            this.dataRingBuffer = dataRingBuffer;
        }
    
        /**
         * 插入数据
         * @param s
         */
        public void pushData(String s) {
    
            //获取下一个位置
            long next = dataRingBuffer.next();
            try {
                //获取容器
                Data data = dataRingBuffer.get(next);
                //设置数据
                data.setData(s);
            } finally {
                //插入
                dataRingBuffer.publish(next);
            }
        }
    }
    

      消费者

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.WorkHandler;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:01 2018/5/7
     */
    public class Customer implements WorkHandler<Data> {
        @Override
        public void onEvent(Data data) throws Exception {
            System.out.println(Thread.currentThread().getName()+"---"+data.getData());
        }
    }
    

      数据工厂

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.EventFactory;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:02 2018/5/7
     */
    public class DataFactory implements EventFactory<Data> {
    
        @Override
        public Data newInstance() {
            return new Data();
        }
    }
    

      主函数

    package cn.lonecloud.procum.disruptor;
    
    import cn.lonecloud.procum.Data;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    
    import java.util.UUID;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author lonecloud
     * @version v1.0
     * @date 下午3:09 2018/5/7
     */
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            //创建线程池
            ExecutorService service = Executors.newCachedThreadPool();
            //创建数据工厂
            DataFactory dataFactory = new DataFactory();
            //设置缓冲区大小,必须为2的指数,否则会有异常
            int buffersize = 1024;
            Disruptor<Data> dataDisruptor = new Disruptor<Data>(dataFactory, buffersize,
                    service);
            //创建消费者线程
            dataDisruptor.handleEventsWithWorkerPool(
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer(),
                    new Customer()
            );
            //启动
            dataDisruptor.start();
            //获取其队列
            RingBuffer<Data> ringBuffer = dataDisruptor.getRingBuffer();
            for (int i = 0; i < 100; i++) {
                //创建生产者
                Producer producer = new Producer(ringBuffer);
                //设置内容
                producer.pushData(UUID.randomUUID().toString());
                //Thread.sleep(1000);
            }
        }
    }
    

      其中策略有几种:

    1. BlockingWaitStrategy:阻塞策略,最节省CPU,但是高并发条件下性能最糟糕

    2 SleepingWaitStrategy:在循环中无限等待,处理数据会产生高延迟,对生产线程影响小,场景:异步日志

    3. YieldingWaitStrategy:低延迟场合,使用必须保证剩余的消费者线程的逻辑CPU

    4. BusySpinWaitStrategy:消费者线程会尽最大努力疯狂的监控缓冲区变化。

  • 相关阅读:
    https://pingcap.com/blog-cn/flame-graph/
    https://software.intel.com/sites/landingpage/pintool/docs/97998/Pin/html/
    http://boostorg.github.io/stacktrace/stacktrace/getting_started.html#stacktrace.getting_started.how_to_print_current_call_stack
    线程局部存储
    slice 切片实现 Slice object interface
    网络分裂 redis 集群
    vscode-sftp
    Sizes of integer types 整形字节长度 系统字节
    学件中心
    源码 版本
  • 原文地址:https://www.cnblogs.com/lonecloud/p/9002927.html
Copyright © 2011-2022 走看看