zoukankan      html  css  js  c++  java
  • Spring Boot + Redis 实现延时队列,写得太好了!

    来源:blog.csdn.net/qq330983778/article/details/99341671

    业务流程

    首先我们分析下这个流程

    1. 用户提交任务。首先将任务推送至延迟队列中。
    2. 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
    3. 然后生成延迟任务(仅仅包含任务id)放入某个桶中
    4. 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
    5. 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
    6. 如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容
    7. 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
    8. 完成消费后,发送finish消息,服务端根据job id删除对应信息。

    对象

    我们现在可以了解到中间存在的几个组件

    • 延迟队列,为Redis延迟队列。实现消息传递
    • Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job
    • Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入
    • Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket
    • Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

    其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

    任务状态

    • ready:可执行状态,
    • delay:不可执行状态,等待时钟周期。
    • reserved:已被消费者读取,但没有完成消费。
    • deleted:已被消费完成或者已被删除。

    对外提供的接口

    额外的内容

    1. 首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
    2. 根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
    3. 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
    4. 文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。

    实现

    现在我们根据设计内容完成设计。这一块设计我们分四步完成

    任务及相关对象

    目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job),Spring Boot 基础就不介绍了,推荐下这个实战教程:
    https://github.com/javastacks/spring-boot-best-practice

    任务对象

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Job implements Serializable {
    
        /**
         * 延迟任务的唯一标识,用于检索任务
         */
        @JsonSerialize(using = ToStringSerializer.class)
        private Long id;
    
        /**
         * 任务类型(具体业务类型)
         */
        private String topic;
    
        /**
         * 任务的延迟时间
         */
        private long delayTime;
    
        /**
         * 任务的执行超时时间
         */
        private long ttrTime;
    
        /**
         * 任务具体的消息内容,用于处理具体业务逻辑用
         */
        private String message;
    
        /**
         * 重试次数
         */
        private int retryCount;
        /**
         * 任务状态
         */
        private JobStatus status;
    }
    

    任务引用对象

    @Data
    @AllArgsConstructor
    public class DelayJob implements Serializable {
    
    
        /**
         * 延迟任务的唯一标识
         */
        private long jodId;
        
        /**
         * 任务的执行时间
         */
        private long delayDate;
    
        /**
         * 任务类型(具体业务类型)
         */
        private String topic;
    
    
        public DelayJob(Job job) {
            this.jodId = job.getId();
            this.delayDate = System.currentTimeMillis() + job.getDelayTime();
            this.topic = job.getTopic();
        }
    
        public DelayJob(Object value, Double score) {
            this.jodId = Long.parseLong(String.valueOf(value));
            this.delayDate = System.currentTimeMillis() + score.longValue();
        }
    }
    

    容器

    目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器

    job任务池,为普通的K/V结构,提供基础的操作

    @Component
    @Slf4j
    public class JobPool {
        
        @Autowired
        private RedisTemplate redisTemplate;
    
        private String NAME = "job.pool";
        
        private BoundHashOperations getPool () {
            BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
            return ops;
        }
    
        /**
         * 添加任务
         * @param job
         */
        public void addJob (Job job) {
            log.info("任务池添加任务:{}", JSON.toJSONString(job));
            getPool().put(job.getId(),job);
            return ;
        }
    
        /**
         * 获得任务
         * @param jobId
         * @return
         */
        public Job getJob(Long jobId) {
            Object o = getPool().get(jobId);
            if (o instanceof Job) {
                return (Job) o;
            }
            return null;
        }
    
        /**
         * 移除任务
         * @param jobId
         */
        public void removeDelayJob (Long jobId) {
            log.info("任务池移除任务:{}",jobId);
            // 移除任务
            getPool().delete(jobId);
        }
    }
    

    延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

    @Slf4j
    @Component
    public class DelayBucket {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        private static AtomicInteger index = new AtomicInteger(0);
    
        @Value("${thread.size}")
        private int bucketsSize;
    
        private List <String> bucketNames = new ArrayList <>();
    
        @Bean
        public List <String> createBuckets() {
            for (int i = 0; i < bucketsSize; i++) {
                bucketNames.add("bucket" + i);
            }
            return bucketNames;
        }
    
        /**
         * 获得桶的名称
         * @return
         */
        private String getThisBucketName() {
            int thisIndex = index.addAndGet(1);
            int i1 = thisIndex % bucketsSize;
            return bucketNames.get(i1);
        }
    
        /**
         * 获得桶集合
         * @param bucketName
         * @return
         */
        private BoundZSetOperations getBucket(String bucketName) {
            return redisTemplate.boundZSetOps(bucketName);
        }
    
        /**
         * 放入延时任务
         * @param job
         */
        public void addDelayJob(DelayJob job) {
            log.info("添加延迟任务:{}", JSON.toJSONString(job));
            String thisBucketName = getThisBucketName();
            BoundZSetOperations bucket = getBucket(thisBucketName);
            bucket.add(job,job.getDelayDate());
        }
    
        /**
         * 获得最新的延期任务
         * @return
         */
        public DelayJob getFirstDelayTime(Integer index) {
            String name = bucketNames.get(index);
            BoundZSetOperations bucket = getBucket(name);
            Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
            if (CollectionUtils.isEmpty(set)) {
                return null;
            }
            ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
            Object value = typedTuple.getValue();
            if (value instanceof DelayJob) {
                return (DelayJob) value;
            }
            return null;
        }
    
        /**
         * 移除延时任务
         * @param index
         * @param delayJob
         */
        public void removeDelayTime(Integer index,DelayJob delayJob) {
            String name = bucketNames.get(index);
            BoundZSetOperations bucket = getBucket(name);
            bucket.remove(delayJob);
        }
    
    }
    

    待完成任务,内部使用topic进行细分,每个topic对应一个list集合

    @Component
    @Slf4j
    public class ReadyQueue {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        private String NAME = "process.queue";
    
        private String getKey(String topic) {
            return NAME + topic;
        }
    
        /**
         * 获得队列
         * @param topic
         * @return
         */
        private BoundListOperations getQueue (String topic) {
            BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
            return ops;
        }
    
        /**
         * 设置任务
         * @param delayJob
         */
        public void pushJob(DelayJob delayJob) {
            log.info("执行队列添加任务:{}",delayJob);
            BoundListOperations listOperations = getQueue(delayJob.getTopic());
            listOperations.leftPush(delayJob);
        }
    
        /**
         * 移除并获得任务
         * @param topic
         * @return
         */
        public DelayJob popJob(String topic) {
            BoundListOperations listOperations = getQueue(topic);
            Object o = listOperations.leftPop();
            if (o instanceof DelayJob) {
                log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
                return (DelayJob) o;
            }
            return null;
        }
        
    }
    

    轮询处理

    设置了线程池为每个bucket设置一个轮询操作

    @Component
    public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
    
        @Autowired
        private DelayBucket delayBucket;
        @Autowired
        private JobPool     jobPool;
        @Autowired
        private ReadyQueue  readyQueue;
        
        @Value("${thread.size}")
        private int length;
        
        @Override 
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            ExecutorService executorService = new ThreadPoolExecutor(
                    length, 
                    length,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue <Runnable>());
    
            for (int i = 0; i < length; i++) {
                executorService.execute(
                        new DelayJobHandler(
                                delayBucket,
                                jobPool,
                                readyQueue,
                                i));
            }
            
        }
    }
    

    测试请求

    /**
     * 测试用请求
     * @author daify
     **/
    @RestController
    @RequestMapping("delay")
    public class DelayController {
        
        @Autowired
        private JobService jobService;
        /**
         * 添加
         * @param request
         * @return
         */
        @RequestMapping(value = "add",method = RequestMethod.POST)
        public String addDefJob(Job request) {
            DelayJob delayJob = jobService.addDefJob(request);
            return JSON.toJSONString(delayJob);
        }
    
        /**
         * 获取
         * @return
         */
        @RequestMapping(value = "pop",method = RequestMethod.GET)
        public String getProcessJob(String topic) {
            Job process = jobService.getProcessJob(topic);
            return JSON.toJSONString(process);
        }
    
        /**
         * 完成一个执行的任务
         * @param jobId
         * @return
         */
        @RequestMapping(value = "finish",method = RequestMethod.DELETE)
        public String finishJob(Long jobId) {
            jobService.finishJob(jobId);
            return "success";
        }
    
        @RequestMapping(value = "delete",method = RequestMethod.DELETE)
        public String deleteJob(Long jobId) {
            jobService.deleteJob(jobId);
            return "success";
        }
        
    }
    

    测试

    添加延迟任务

    通过postman请求:localhost:8000/delay/add

    此时这条延时任务被添加进了线程池中

    2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
    2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
    

    根据设置10秒钟之后任务会被添加至ReadyQueue中

    2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
    

    获得任务

    这时候我们请求localhost:8000/delay/pop

    这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

    2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
    2019-08-09 19:36:02.364  INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
    2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
    

    按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

    2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
    2019-08-12 21:21:48.261  INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
    

    任务的删除/消费

    现在我们请求:localhost:8000/delay/delete

    此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

    2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 任务池移除任务:3
    2019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler  : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
    

    近期热文推荐:

    1.1,000+ 道 Java面试题及答案整理(2021最新版)

    2.别在再满屏的 if/ else 了,试试策略模式,真香!!

    3.卧槽!Java 中的 xx ≠ null 是什么新语法?

    4.Spring Boot 2.5 重磅发布,黑暗模式太炸了!

    5.《Java开发手册(嵩山版)》最新发布,速速下载!

    觉得不错,别忘了随手点赞+转发哦!

  • 相关阅读:
    STM32之ADC+步骤小技巧(英文)
    STM32之待机唤醒
    STM32_RTC君
    STM32之输入捕获以及小小应用(库)
    STM32之PWM君
    STM32之通用定时器
    STM32之看门狗(独立与窗口)
    零基础学习qt4 第七章的第一个例子
    SPI
    STM32串口中断的一些资料
  • 原文地址:https://www.cnblogs.com/javastack/p/15328861.html
Copyright © 2011-2022 走看看