zoukankan      html  css  js  c++  java
  • 系统延迟任务设计

    延迟任务设计思路

    入队操作:ZADD KEY timestamp task, 我们将需要处理的任务
    按其需要延迟处理时间作为 Score 加入到 ZSet 中。Redis 的 ZAdd 的时间复杂度是 O(logN),N是 ZSet 中元素个数,因此我们能相对比较高效的进行入队操作。

    起一个进程定时(比如每隔一秒)通过 ZREANGEBYSCORE 方法查询 ZSet 中 Score 最小的元素
    具体操作为:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查询结果有两种情况:
    a. 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
    b. 查询出的分数大于当前时间戳,由于刚刚的查询操作取出来的是分数最小的元素,所以说明 ZSet 中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;

    同样的,ZRANGEBYSCORE操作的时间复杂度为 O(logN + M),其中N为 ZSet 中元素个数,M 为查询的元素个数,因此我们定时查询操作也是比较高效的。

    不足

    0.两个队列每 5s 执行一次,所以并不能非常实时的执行任务。
    1.两个队列每 5s 执行一次,扫描每个队列中最近的 2 条记录,如果在同一时间段有很多的任务需要执行,则无法准时执行。
    2.不能动态增加 Redis 队列

    具体业务代码

    延迟任务表

    DROP TABLE IF EXISTS `bs_delay_task`;
    CREATE TABLE `bs_delay_task`  (
      `ID` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
      `CREATE_DATE` datetime NULL DEFAULT NULL,
      `DOMAIN_ID` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
      `DOMAIN_TYPE` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
      `MODIFY_DATE` datetime NULL DEFAULT NULL,
      `NAME` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
      `REDIS_KEY` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
      `REDIS_VALUE` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
      `REMARK` varchar(1024) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `STATUS` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `TASK_EXECUTE_DATE` datetime NULL DEFAULT NULL,
      PRIMARY KEY (`ID`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
    

    定时任务

    @Component
    public class DelayTaskQuartz {
    
        @Autowired
        private DelayTaskService delayTaskService;
    
        // 5s 执行一次
        @Scheduled(cron = "2/5 * * * * ?")
        public void ConferenceQuartz() {
            delayTaskService.executeLoopTask
        }
    }
    

    DelayTaskService

    @Service
    @Transactional
    public class DelayTaskService {
    
        Log log = LogFactory.get();
    
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
    
        public static final String DELAY_TEST_KEY_ONE = "redis_delay_queue_1";
        public static final String DELAY_TEST_KEY_TWO = "redis_delay_queue_2";
        public static List<String> REDIS_KEYS = new ArrayList<>(4);
    
        static {
            REDIS_KEYS.add(DELAY_TEST_KEY_ONE);
            REDIS_KEYS.add(DELAY_TEST_KEY_TWO);
        }
    
        /**
         * 创建延迟任务,在XX秒后执行
         */
        @Transactional
        public void createDelayTest(@NotNull Integer secondAfter, @NotNull String domainType, @NotNull String domainId) {
            Calendar rightNow = Calendar.getInstance();
            rightNow.setTime(new Date());
            rightNow.add(Calendar.SECOND, secondAfter);
            Date targetDate = rightNow.getTime();
            createDelayTest(targetDate, domainType, domainId);
        }
    
        /**
         * 创建延迟任务,在 executeDate 时执行
         */
        @Transactional
        public synchronized void createDelayTest(@NotNull Date executeDate, @NotNull String domainType, @NotNull String domainId) {
    
            DelayTaskQO delayTaskQO = new DelayTaskQO();
            delayTaskQO.setDomainType(domainType);
            delayTaskQO.setDomainId(domainId);
            delayTaskQO.setStatus(DelayTask.STATUS_READY);
            DelayTask delayTask = this.queryUnique(delayTaskQO);
    
            if (null != delayTask) {
                delayTask.setStatus(DelayTask.STATUS_CANCEL);
                delayTask.setModifyDate(new Date());
                delayTask.setRemark("添加重复任务,旧任务自动取消");
                this.update(delayTask);
            }
            String redisKey = getRandomRedisKey();
            // e.g. conference_uuid_20210524-120000
            String redisValue = domainType + "_" + domainId + "_" + Date2StrShort(executeDate);
            long score = Date2Score(executeDate);
    
            CreateDelayTaskCommand cmd = new CreateDelayTaskCommand();
            cmd.setName(domainType + " " + Date2Str(executeDate) + " 执行");
            cmd.setDomainType(domainType);
            cmd.setDomainId(domainId);
            cmd.setRedisKey(redisKey);
            cmd.setRedisValue(redisValue);
            cmd.setTaskExecuteDate(executeDate);
            this.create(cmd);
            RedisUtils.zAdd(redisKey, redisValue, score);
        }
    
        @Transactional
        public void create(CreateDelayTaskCommand command) {
            DelayTask delayTask = new DelayTask();
            delayTask.create(command);
            save(delayTask);
        }
    
        // 定时任务
        @Transactional
        public void executeLoopTask() {
            this.executeTask(DELAY_TEST_KEY_ONE);
            this.executeTask(DELAY_TEST_KEY_TWO);
        }
    
        @Transactional
        public void executeTask(String key) {
            Set<ZSetOperations.TypedTuple<String>> values =
                    RedisUtils.zRangeByScoreWithScores(key, Long.MIN_VALUE, Long.MAX_VALUE, 0, 2);
            if (null != values) {
                values.forEach(v -> {
                    Double timeScoreWithTask = v.getScore();
                    String value = v.getValue();
                    // 判断获取的任务是否要执行
                    if (System.currentTimeMillis() > timeScoreWithTask) {
                        RedisUtils.zRemove(key, value);
    
                        String[] splitStr = value.split("_");
                        String domainType = splitStr[0];
                        String domainId = splitStr[1];
                        // 发送通知给订阅者
                        applicationEventPublisher.publishEvent(new DelayTestExecuteEvent(domainType, domainId, key));
    
                        log.error("DelayTestService " + value + " 任务已经执行");
                    } else {
                        log.info(key + ":" + value + " 未到执行时间,不需要执行");
                    }
                });
            }
        }
    
    
        private String getRandomRedisKey() {
            Random random = new Random();
            int n = random.nextInt(REDIS_KEYS.size());
            return REDIS_KEYS.get(n);
        }
    
        private String Date2Str(Date date) {
            DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return df.format(date);
        }
    
        private String Date2StrShort(Date date) {
            DateFormat df = new SimpleDateFormat("yyyyMMdd-HHmmss");
            return df.format(date);
        }
    
        private long Date2Score(Date date) {
            Calendar cal = Calendar.getInstance();
            cal.setTime(date);
            Long score = cal.getTimeInMillis();
            return score;
        }
    
    }
    

    观察者对象

    public class DelayTestExecuteEvent extends ApplicationEvent {
        private String domainType;
        private String domainId;
        private String redisKey;
    
        public DelayTestExecuteEvent(Object source) {
            super(source);
        }
    
        public DelayTestExecuteEvent(String domainType, String domainId, String redisKey) {
            super("");
            this.domainType = domainType;
            this.domainId = domainId;
            this.redisKey = redisKey;
        }
    
        public String getDomainType() {
            return domainType;
        }
    
        public void setDomainType(String domainType) {
            this.domainType = domainType;
        }
    
        public String getDomainId() {
            return domainId;
        }
    
        public void setDomainId(String domainId) {
            this.domainId = domainId;
        }
    
        public String getRedisKey() {
            return redisKey;
        }
    
        public void setRedisKey(String redisKey) {
            this.redisKey = redisKey;
        }
    }
    
    
    @Component
    public class DelayTestExecuteObserver implements ApplicationListener<DelayTestExecuteEvent> {
    
        Log log = LogFactory.get();
    
        @Autowired
        private DelayTaskService delayTaskService;
    
    
        @Override
        public void onApplicationEvent(DelayTestExecuteEvent event) {
            String domainType = event.getDomainType();
            String domainId = event.getDomainId();
            String redisKey = event.getRedisKey();
    
            log.info("正在执行 domainType:" + domainType + " domainId:" + domainId + " redisKey:" + redisKey);
    
            DelayTaskQO delayTaskQO = new DelayTaskQO();
            delayTaskQO.setDomainType(domainType);
            delayTaskQO.setDomainId(domainId);
            delayTaskQO.setStatus(DelayTask.STATUS_READY);
            DelayTask delayTask = delayTaskService.queryUnique(delayTaskQO);
            if (null == delayTask) {
                return;
            }
    
            // 执行任务
            try {
                executeTask(delayTask);
    
                delayTask.setStatus(DelayTask.STATUS_FINISH);
                delayTask.setModifyDate(new Date());
                delayTaskService.update(delayTask);
            } catch (Exception e) {
                log.error(e);
    
                delayTask.setStatus(DelayTask.STATUS_FINISH_ERROR);
                delayTask.setModifyDate(new Date());
                delayTaskService.update(delayTask);
            }
        }
    
        private void executeTask(DelayTask delayTask) throws BaseException {
            String domainId = delayTask.getDomainId();
            String domainType = delayTask.getDomainType();
    
            // TODO 这里可以优化,可以使用策略
            if (StringUtils.equals(domainType, DelayTask.XXX)) {
               // 具体的任务代码
            }
        }
    }
    
  • 相关阅读:
    Android 3.0 r1 API中文文档(108) —— ExpandableListAdapter
    Android 3.0 r1 API中文文档(113) ——SlidingDrawer
    Android 3.0 r1 API中文文档(105) —— ViewParent
    Android 中文 API (102)—— CursorAdapter
    Android开发者指南(4) —— Application Fundamentals
    Android开发者指南(1) —— Android Debug Bridge(adb)
    Android中文API(115)——AudioFormat
    Android中文API(116)——TableLayout
    Android开发者指南(3) —— Other Tools
    Android中文API (110) —— CursorTreeAdapter
  • 原文地址:https://www.cnblogs.com/manastudent/p/15656434.html
Copyright © 2011-2022 走看看