zoukankan      html  css  js  c++  java
  • disruptor 单生产者多消费者

    demo1 单生产者多消费者创建。

    maven 依赖

    <!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
            <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.4.2</version>
            </dependency>

    1 对象 - Message

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class Message2 {
        private String id;
        private String name;
        private double price;
    }

    2 在主函数中创建 disruptor

    Disruptor<Message2> disruptor = new Disruptor<>(
                    new EventFactory<Message2>() {
                        @Override
                        public Message2 newInstance() {
                            return new Message2();
                        }
                    },
                    1 << 10,
                    Executors.defaultThreadFactory(),
                    ProducerType.SINGLE,
                    new BusySpinWaitStrategy()
            );

    3 disruptor 绑定消费者

    // disruptor 绑定消费者
    disruptor.handleEventsWith(new MessageHandler1());
    
    
    //创建消费者
    @Slf4j
    public class MessageHandler1 implements EventHandler<Message2> {
        @Override
        public void onEvent(Message2 event, long sequence, boolean endOfBatch) throws Exception {
            event.setId(UUID.randomUUID().toString());
            log.info("【handler1,set id】 id: {}, name: {}, price: {}", event.getId(), event.getName(), event.getPrice());
        }
    }

    4 启动 disruptor

    RingBuffer<Message2> ringBuffer = disruptor.start();

    5 disruptor 绑定生产者

    //绑定生产者
    CountDownLatch latch = new CountDownLatch(1);
    ExecutorService es = Executors.newFixedThreadPool(4);
    es.submit(new MessagePublish2(disruptor, latch));
    
    // 生产者类
    public class MessagePublish2 implements Runnable {
        private Disruptor<Message2> disruptor;
        private CountDownLatch latch;
    
        public MessagePublish2(Disruptor<Message2> disruptor, CountDownLatch latch) {
            this.disruptor = disruptor;
            this.latch = latch;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                disruptor.publishEvent(new MessageEventTranslator());
            }
            latch.countDown();
        }
    }

    6 阻塞等待 & 关闭服务

            // 阻塞等待
            latch.await();
    
            // 关闭服务
            es.shutdown();
            disruptor.shutdown();
  • 相关阅读:
    mysql批量替换指定字符串
    php中英字符串截取
    比较两个JSON字符串是否完全相等
    Java FastJson 介绍
    线程池
    DBUS及常用接口介绍
    在Mac中如何正确地设置JAVA_HOME
    base64 原理
    sizeof与strlen的区别
    Kubernetes 部署失败的 10 个最普遍原因
  • 原文地址:https://www.cnblogs.com/zhaopengcheng/p/10971516.html
Copyright © 2011-2022 走看看