zoukankan      html  css  js  c++  java
  • disruptor笔记之五:事件消费实战

    欢迎访问我的GitHub

    https://github.com/zq2599/blog_demos

    内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

    《disruptor笔记》系列链接

    1. 快速入门
    2. Disruptor类分析
    3. 环形队列的基础操作(不用Disruptor类)
    4. 事件消费知识点小结
    5. 事件消费实战
    6. 常见场景
    7. 等待策略
    8. 知识点补充(终篇)

    本篇概览

    本篇是《disruptor笔记》的第五篇,前文《disruptor笔记之四:事件消费知识点小结》从理论上梳理分析了独立消费和共同消费,留下了三个任务,今天就来成这些任务,即编码实现以下三个场景:

    1. 100个订单,短信和邮件系统独立消费
    2. 100个订单,邮件系统的两个邮件服务器共同消费;
    3. 100个订单,短信系统独立消费,与此同时,两个邮件服务器共同消费;

    源码下载

    名称 链接 备注
    项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
    git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
    git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
    • 这个git项目中有多个文件夹,本次实战的源码在disruptor-tutorials文件夹下,如下图红框所示:

    在这里插入图片描述

    • disruptor-tutorials是个父工程,里面有多个module,本篇实战的module是consume-mode,如下图红框所示:

    在这里插入图片描述

    编写公共代码

    • 为了完成任务,编码实现上面那三个场景,咱们需要先把公共代码写好;
    • 首先是在父工程disruptor-tutorials下面新建名为consume-mode的module,其build.gradle内容如下:
    plugins {
        id 'org.springframework.boot'
    }
    
    dependencies {
        implementation 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter'
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'com.lmax:disruptor'
    
        testImplementation('org.springframework.boot:spring-boot-starter-test')
    }
    
    • springboot启动类:
    package com.bolingcavalry;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ConsumeModeApplication {
    	public static void main(String[] args) {
    		SpringApplication.run(ConsumeModeApplication.class, args);
    	}
    }
    
    • 订单事件定义:
    package com.bolingcavalry.service;
    
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @ToString
    @NoArgsConstructor
    public class OrderEvent {
    
        private String value;
    }
    
      • 订单事件的工程类,定义事件实例如何创建:
    package com.bolingcavalry.service;
    
    import com.lmax.disruptor.EventFactory;
    
    public class OrderEventFactory implements EventFactory<OrderEvent> {
    
        @Override
        public OrderEvent newInstance() {
            return new OrderEvent();
        }
    }
    
    • 订单事件生产者类,定义如何将业务信息通过事件发布到环形队列:
    package com.bolingcavalry.service;
    
    import com.lmax.disruptor.RingBuffer;
    
    public class OrderEventProducer {
        // 存储数据的环形队列
        private final RingBuffer<OrderEvent> ringBuffer;
    
        public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(String content) {
            // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置
            long sequence = ringBuffer.next();
    
            try {
                // sequence位置取出的事件是空事件
                OrderEvent orderEvent = ringBuffer.get(sequence);
                // 空事件添加业务信息
                orderEvent.setValue(content);
            } finally {
                // 发布
                ringBuffer.publish(sequence);
            }
        }
    }
    
    • 消费订单事件的短信服务,实现EventHandler接口,所以是用在独立消费的场景:
    package com.bolingcavalry.service;
    
    import com.lmax.disruptor.EventHandler;
    import lombok.extern.slf4j.Slf4j;
    import java.util.function.Consumer;
    
    @Slf4j
    public class SmsEventHandler implements EventHandler<OrderEvent> {
    
        public SmsEventHandler(Consumer<?> consumer) {
            this.consumer = consumer;
        }
    
        // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
        private Consumer<?> consumer;
    
        @Override
        public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
            log.info("短信服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
    
            // 这里延时100ms,模拟消费事件的逻辑的耗时
            Thread.sleep(100);
    
            // 如果外部传入了consumer,就要执行一次accept方法
            if (null!=consumer) {
                consumer.accept(null);
            }
        }
    }
    
    • 消费订单事件的邮件服务,实现EventHandler接口,所以是用在独立消费的场景:
    package com.bolingcavalry.service;
    
    import com.lmax.disruptor.EventHandler;
    import lombok.extern.slf4j.Slf4j;
    import java.util.function.Consumer;
    
    @Slf4j
    public class MailEventHandler implements EventHandler<OrderEvent> {
    
        public MailEventHandler(Consumer<?> consumer) {
            this.consumer = consumer;
        }
    
        // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
        private Consumer<?> consumer;
    
        @Override
        public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
            log.info("邮件服务 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
    
            // 这里延时100ms,模拟消费事件的逻辑的耗时
            Thread.sleep(100);
    
            // 如果外部传入了consumer,就要执行一次accept方法
            if (null!=consumer) {
                consumer.accept(null);
            }
        }
    }
    
    • 消费订单事件的邮件服务,实现WorkHandler接口,所以是用在共同消费的场景:
    package com.bolingcavalry.service;
    
    import com.lmax.disruptor.WorkHandler;
    import lombok.extern.slf4j.Slf4j;
    import java.util.function.Consumer;
    
    @Slf4j
    public class MailWorkHandler implements WorkHandler<OrderEvent> {
    
        public MailWorkHandler(Consumer<?> consumer) {
            this.consumer = consumer;
        }
    
        // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
        private Consumer<?> consumer;
    
        @Override
        public void onEvent(OrderEvent event) throws Exception {
            log.info("共同消费模式的邮件服务 : {}", event);
    
            // 这里延时100ms,模拟消费事件的逻辑的耗时
            Thread.sleep(100);
    
            // 如果外部传入了consumer,就要执行一次accept方法
            if (null!=consumer) {
                consumer.accept(null);
            }
        }
    }
    
    • 最后,将发布和消费事件的逻辑写在一个抽象类里,但是具体如何消费事件并不在此类中实现,而是留给子类,这个抽象类中有几处要注意的地方稍后会提到:
    package com.bolingcavalry.service;
    
    import com.lmax.disruptor.dsl.Disruptor;
    import lombok.Setter;
    import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
    import javax.annotation.PostConstruct;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.function.Consumer;
    
    public abstract class ConsumeModeService {
        /**
         * 独立消费者数量
         */
        public static final int INDEPENDENT_CONSUMER_NUM = 2;
    
        /**
         * 环形缓冲区大小
         */
        protected int BUFFER_SIZE = 16;
    
        protected Disruptor<OrderEvent> disruptor;
    
        @Setter
        private OrderEventProducer producer;
    
        /**
         * 统计消息总数
         */
        protected final AtomicLong eventCount = new AtomicLong();
    
        /**
         * 这是辅助测试用的,
         * 测试的时候,完成事件发布后,测试主线程就用这个countDownLatch开始等待,
         * 在消费到指定的数量(countDownLatchGate)后,消费线程执行countDownLatch的countDown方法,
         * 这样测试主线程就可以结束等待了
         */
        private CountDownLatch countDownLatch;
    
        /**
         * 这是辅助测试用的,
         * 测试的时候,完成事件发布后,测试主线程就用这个countDownLatch开始等待,
         * 在消费到指定的数量(countDownLatchGate)后,消费线程执行countDownLatch的countDown方法,
         * 这样测试主线程就可以结束等待了
         */
        private int countDownLatchGate;
    
        /**
         * 准备一个匿名类,传给disruptor的事件处理类,
         * 这样每次处理事件时,都会将已经处理事件的总数打印出来
         */
        protected Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
    
                /**
                 * 这是辅助测试用的,
                 * 测试的时候,完成事件发布后,测试主线程就用这个countDownLatch开始等待,
                 * 在消费到指定的数量(countDownLatchGate)后,消费线程执行countDownLatch的countDown方法,
                 * 这样测试主线程就可以结束等待了
                 */
                if (null!=countDownLatch && count>=countDownLatchGate) {
                    countDownLatch.countDown();
                }
            }
        };
    
        /**
         * 发布一个事件
         * @param value
         * @return
         */
        public void publish(String value) {
            producer.onData(value);
        }
    
        /**
         * 返回已经处理的任务总数
         * @return
         */
        public long eventCount() {
            return eventCount.get();
        }
    
        /**
         * 这是辅助测试用的,
         * 测试的时候,完成事件发布后,测试主线程就用这个countDownLatch开始等待,
         * 在消费到指定的数量(countDownLatchGate)后,消费线程执行countDownLatch的countDown方法,
         * 这样测试主线程就可以结束等待了
         * @param countDownLatch
         * @param countDownLatchGate
         */
        public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {
            this.countDownLatch = countDownLatch;
            this.countDownLatchGate = countDownLatchGate;
        }
    
        /**
         * 留给子类实现具体的事件消费逻辑
         */
        protected abstract void disruptorOperate();
    
        @PostConstruct
        private void init() {
            // 实例化
            disruptor = new Disruptor<>(new OrderEventFactory(),
                    BUFFER_SIZE,
                    new CustomizableThreadFactory("event-handler-"));
    
            // 留给子类实现具体的事件消费逻辑
            disruptorOperate();
    
            // 启动
            disruptor.start();
    
            // 生产者
            setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
        }
    }
    
    • 上述代码,有以下几处需要注意:
    1. init方法是spring bean实例化后要执行的方法,这里面实例化Disruptor,还启动了消费线程,并且实例化了事件生产者,具体的事件消费逻辑,由子类在disruptorOperate方法中实现;
    2. eventCountPrinter是个匿名类实例,传给事件消费的handler后,每消费一个事件都会执行一次eventCountPrinter.accept方法,这样就把消费事件的总数准确的保存在eventCount变量中了;
    3. countDownLatch和countDownLatchGate是为了辅助单元测试而准备的,测试的时候,完成事件发布后,测试主线程就用这个countDownLatch开始等待,在消费到指定的数量(countDownLatchGate)后,消费线程执行countDownLatch的countDown方法,这样测试主线程就可以结束等待了
    • 至此,公用代码就写完了,可见抽象父类已经做好了大部分事情,咱们的子类可以聚焦事件消费的逻辑编排了,开始挨个实现那三个场景;

    100个订单,短信和邮件系统独立消费

    • 两个消费者独立消费的逻辑非常简单,就一行代码,调用handleEventsWith方法把所有消费者实例传进去,就完事了:
    package com.bolingcavalry.service.impl;
    
    import com.bolingcavalry.service.ConsumeModeService;
    import com.bolingcavalry.service.MailEventHandler;
    import com.bolingcavalry.service.SmsEventHandler;
    import org.springframework.stereotype.Service;
    
    @Service("independentModeService")
    public class IndependentModeServiceImpl extends ConsumeModeService {
    
        @Override
        protected void disruptorOperate() {
            // 调用handleEventsWith,表示创建的多个消费者,每个都是独立消费的
            // 这里创建两个消费者,一个是短信的,一个是邮件的
            disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));
        }
    }
    
    • 单元测试代码如下,要注意的地方是发布完100事件后,调用countDownLatch.await()方法开始等待,直到消费者线程调用countDownLatch.countDown()方法解除等待,还有就是预期的消费消息总数等于200
    package com.bolingcavalry.service.impl;
    
    import com.bolingcavalry.service.ConsumeModeService;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import java.util.concurrent.CountDownLatch;
    import static org.junit.Assert.assertEquals;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @Slf4j
    public class ConsumeModeServiceTest {
    
        @Autowired
        @Qualifier("independentModeService")
        ConsumeModeService independentModeService;
    
        /**
         * 测试时生产的消息数量
         */
        private static final int EVENT_COUNT = 100;
    
        private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
    
            // 告诉service,等消费到expectEventCount个消息时,就执行countDownLatch.countDown方法
            service.setCountDown(countDownLatch, expectEventCount);
    
            for(int i=0;i<eventCount;i++) {
                log.info("publich {}", i);
                service.publish(String.valueOf(i));
            }
    
            // 当前线程开始等待,前面的service.setCountDown方法已经告诉过service,
            // 等消费到expectEventCount个消息时,就执行countDownLatch.countDown方法
            // 千万注意,要调用await方法,而不是wait方法!
            countDownLatch.await();
    
            // 消费的事件总数应该等于发布的事件数
            assertEquals(expectEventCount, service.eventCount());
        }
    
        @Test
        public void testIndependentModeService() throws InterruptedException {
            log.info("start testIndependentModeService");
            testConsumeModeService(independentModeService,
                    EVENT_COUNT,
                    EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
        }
    }
    
    • 单元测试执行结果如下,符合预期:

    在这里插入图片描述

    100个订单,邮件系统的两个邮件服务器共同消费

    • 两个消费者共同消费的代码也很简单,调用handleEventsWithWorkerPool方法即可,把共同消费的MailWorkHandler实例作为参数传入:
    package com.bolingcavalry.service.impl;
    
    import com.bolingcavalry.service.ConsumeModeService;
    import com.bolingcavalry.service.MailWorkHandler;
    import org.springframework.stereotype.Service;
    
    @Service("shareModeService")
    public class ShareModeServiceImpl extends ConsumeModeService {
        @Override
        protected void disruptorOperate() {
            // mailWorkHandler1模拟一号邮件服务器
            MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);
    
            // mailWorkHandler2模拟一号邮件服务器
            MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);
    
            // 调用handleEventsWithWorkerPool,表示创建的多个消费者以共同消费的模式消费
            disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
        }
    }
    
    • 单元测试是在ConsumeModeServiceTest.java中添加如下代码,注意由于是共同消费,因此预期的消费事件数等于消息数,都是100:
        @Autowired
        @Qualifier("shareModeService")
        ConsumeModeService shareModeService;
    
        @Test
        public void testShareModeService() throws InterruptedException {
            log.info("start testShareModeService");
            testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);
        }
    
    • 执行单元测试,结果如下图:

    在这里插入图片描述

    100个订单,短信系统独立消费,与此同时,两个邮件服务器共同消费

    • 最后一个场景,依旧很简单,handleEventsWith调用一次,再调用一次handleEventsWithWorkerPool即可:
    package com.bolingcavalry.service.impl;
    
    import com.bolingcavalry.service.ConsumeModeService;
    import com.bolingcavalry.service.MailWorkHandler;
    import com.bolingcavalry.service.SmsEventHandler;
    import org.springframework.stereotype.Service;
    
    @Service("independentAndShareModeService")
    public class IndependentAndShareModeServiceImpl extends ConsumeModeService {
        @Override
        protected void disruptorOperate() {
            // 调用handleEventsWith,表示创建的多个消费者,每个都是独立消费的
            // 这里创建一个消费者,短信服务
            disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter));
    
            // mailWorkHandler1模拟一号邮件服务器
            MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);
    
            // mailWorkHandler2模拟一号邮件服务器
            MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);
    
            // 调用handleEventsWithWorkerPool,表示创建的多个消费者以共同消费的模式消费
            disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
        }
    }
    
    • 单元测试是在ConsumeModeServiceTest.java中添加如下代码,预期的消费事件数应该是200,因为整体上是两个独立消费,只不过其中的一个内部有两个消费者共同消费:
        @Autowired
        @Qualifier("independentAndShareModeService")
        ConsumeModeService independentAndShareModeService;
    
        @Test
        public void independentAndShareModeService() throws InterruptedException {
            log.info("start independentAndShareModeService");
            testConsumeModeService(independentAndShareModeService,
                    EVENT_COUNT,
                    EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
        }
    
    • 单元测试结果如下,符合预期:

    在这里插入图片描述

    • 至此,独立消费和共同消费的实战就完成了,借助disruptor,三个常见场景都可以轻松完成,如果您正在做这些场景的开发,希望本文能给您一些参考;

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列

    欢迎关注公众号:程序员欣宸

    微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
    https://github.com/zq2599/blog_demos

  • 相关阅读:
    学习windows的半天+学习Git分布式系统的半天
    输出从1加到100的结果、打印100以内的质数、计算一个文件中的每个英文单词出现的次数
    Linux操作系统--初级--进程管理的命令
    Linux操作系统--初级--防火墙
    Linux操作系统--初级--dns服务
    Linux操作系统--初级--网络安全基础
    Linux操作系统--初级--进程管理
    Linux操作系统--初级--Linux网络
    Linux操作系统--初级--Linux磁盘管理
    Linux操作系统--初级--Linux的用户与用户组(权限管理)
  • 原文地址:https://www.cnblogs.com/bolingcavalry/p/15346150.html
Copyright © 2011-2022 走看看