zoukankan      html  css  js  c++  java
  • 使用redisson分布式延时队列,控制游戏的自动开始或结束,实现定时效果

    技术选型

    使用分布式延时队列来控制游戏的开始或结束

    redisson 延时队列使用参考:

    https://www.javadoc.io/doc/org.redisson/redisson/latest/org/redisson/api/RDelayedQueue.html

    https://github.com/redisson/redisson/wiki/7.-Distributed-collections

    流程图

    代码

    发布游戏的时候,向延时队列里面放入对应的延时key

    controller层代码:

    @ApiOperation("发布游戏")
    @PostMapping(value = "publish")
    @Auth(value = GmcenterConstants.LOG_OBJ_GAME, action = Constants.LOG_TYPE_UPDATESINGLE, type = AuthType.TOKEN)
    @NonDuplicateRequest(value = "T(com.yxt.gmcenter.app.common.GmcenterRedisKeys).GAME_PUBLISH_LOCK + #currentUserId + ':' + #gameId", fetchCurrentUser = true)
    public Long gamePublish(Long gameId) {
        log.debug("GameCenterController#gamePublish params: {}", gameId);
        Validate.isNotNull(gameId, GmcenterExceptionKeys.APIS_GAME_STATUS_NOT_FOUND);
    
        return gameService.publishGame(gameId);
    }

    service层代码:

    @Transactional(rollbackFor = Exception.class)
    public Long publishGame(Long gameId) {
        GmGame game = getById(gameId);
    
        //只有未发布状态的游戏才可以发布
        gameCommonService.checkGamePublishStatus(game);
    
        Integer subjectCount = gameSubjectMapper.selectCount(
                new QueryWrapper<GmGameSubject>().lambda().eq(GmGameSubject::getDeleted, NOT_DELETED)
                        .eq(GmGameSubject::getGameId, gameId));
        Validate.isTrue(subjectCount > INT_0, GmcenterExceptionKeys.APIS_GAME_PUBLISH_SUBJECT_NOT_EMPTY);
    
        //判断游戏下面是否有题目以及成员,如果没有题目和成员,也不可以发布
        Integer userCount = gameUserMapper.selectCount(
                new QueryWrapper<GmGameUser>().lambda().eq(GmGameUser::getDeleted, NOT_DELETED)
                        .eq(GmGameUser::getGameId, gameId));
        Validate.isTrue(userCount > INT_0, GmcenterExceptionKeys.APIS_GAME_PUBLISH_USER_NOT_EMPTY);
    
        getAndSetStatus(game);
        gameCommonService.setUpdateField(game);
    
        updateById(game);
    
        setRedisClock(game);
    
        //发布完成之后,redis设置发布的次数,默认为1,发布+1,撤销-1
        changePublishCount(game, GameOperateEnum.PUBLISH);
    
        return game.getId();
    }
    
    
    /**
     * 设置redis定时开始/结束游戏
     *
     * @param game 游戏
     */
    private void setRedisClock(GmGame game) {
        Long gameId = game.getId();
        int gameStatus = game.getGameStatus();
        LocalDateTime endTime = game.getEndTime();
        LocalDateTime startTime = game.getStartTime();
    
        LocalDateTime now = LocalDateTime.now();
        long startSecond = LocalDateTimeUtil.between(now, startTime, ChronoUnit.SECONDS);
        long endSecond = LocalDateTimeUtil.between(now, endTime, ChronoUnit.SECONDS);
    
        RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(GAME_AUTO_ACTION_DELAY_QUEUE);
        //获取到延迟队列
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        if (UN_START.getStatus().equals(gameStatus)) {
            // case1:未开始状态->设置开始时间redis->判断endTime是否设置->设置结束redis
            delayedQueue
                    .offer(StringUtils.joinWith("_", ACTION_START, gameId, startTime.format(PURE_DATETIME_FORMATTER)),
                            Math.abs(startSecond), TimeUnit.SECONDS);
            setEndTime(game, endTime, startTime, startSecond, endSecond, delayedQueue);
        } else if (STARTED.getStatus().equals(gameStatus)) {
            // case2:开始状态->那就判断结束时间是否设置了->如果设置了就设置redis,没设置,就什么都不用设置了
            setEndTime(game, endTime, startTime, startSecond, endSecond, delayedQueue);
        }
        delayedQueue.destroy();
        // case3:已结束状态->nothing(发布即结束)
    }
    
    
    
    
    /**
     * 设置结束时间
     *
     * @param game         游戏
     * @param endTime      结束时间
     * @param startTime    开始时间
     * @param startSecond  从当前到开始时间的秒数
     * @param endSecond    从当前时间到结束时间的秒数
     * @param delayedQueue 延时队列
     */
    private void setEndTime(GmGame game, LocalDateTime endTime, LocalDateTime startTime, long startSecond,
            long endSecond, RDelayedQueue<String> delayedQueue) {
        if (!DateUtil.isDefaultDateTime(endTime) && GameTypeEnum.TOWER.getStatus().equals(game.getGameType())) {
            //如果是智慧塔,且设置了结束时间,正常设置结束时间
            delayedQueue.offer(StringUtils
                            .joinWith("_", ACTION_FINISH, game.getId(), endTime.format(PURE_DATETIME_FORMATTER)),
                    Math.abs(endSecond), TimeUnit.SECONDS);
        } else if (DateUtil.isDefaultDateTime(endTime) && GameTypeEnum.ARENA.getStatus().equals(game.getGameType())) {
            //如果是头脑竞技场,结束时间是后台算出来的,所以也需要设置自动结束
            long useTime = gameCommonService.calcExamUseTime(game);
            delayedQueue.offer(StringUtils.joinWith("_", ACTION_FINISH, game.getId(),
                    startTime.format(PURE_DATETIME_FORMATTER) + "&" + gameCommonService.subjectCount(game)),
                    Math.abs(startSecond) + useTime, TimeUnit.SECONDS);
        } else {
            //不是头脑竞技场,又不是智慧塔,那你就别发布了
            throw new ApiException(GmcenterExceptionKeys.APIS_GAME_TYPE_NOT_SUPPORT);
        }
    }

    放入延时队列之后,再写一个监听阻塞队列的后台任务:

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class GameAutoListener implements CommandLineRunner {
        private final LockService lockService;
        private final GmGameService gameService;
        private final RedissonClient redissonClient;
        private final GameCommonService gameCommonService;
        private final GmGameSubjectService subjectService;
    
        @Override
        public void run(String... args) throws Exception {
            log.debug("start GameAutoListener...");
            RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(GAME_AUTO_ACTION_DELAY_QUEUE);
    
            //noinspection InfiniteLoopStatement
            while (true) {
                //根据key不同,进行不同的操作,使用take可以在队列为空的时候阻塞队列,使线程处于wait状态,防止占用过多cpu时间
                String actionAndGameId = blockingDeque.take();
                log.debug("get blockingDeque game: {}", actionAndGameId);
                try {
                    String[] s = StringUtils.split(actionAndGameId, '_');
    
                    String action = s[0];
                    String gameId = s[1];
                    //当action为start时:该字段为startTime
                    //当action为finish时,该字段针对智慧塔类型为endTime,针对头脑竞技场类型为startTime&subCount
                    String timeAndSubCount = s[2];
    
                    GmGame game = gameService.getById(gameId);
                    Integer gameStatus = game.getGameStatus();
                    String dbStartTime = game.getStartTime().format(PURE_DATETIME_FORMATTER);
                    String dbEndTime = game.getEndTime().format(PURE_DATETIME_FORMATTER);
                    Integer gameType = game.getGameType();
    
                    if (action.equals(ACTION_START)) {
                        log.debug("game {} is starting.", gameId);
    
                        //加锁更新数据库之前,先查看缓存的开始时间和需要开始的游戏的开始时间是否相等
                        if (StringUtils.equals(dbStartTime, timeAndSubCount) && game.getDeleted().equals(NOT_DELETED)
                                && GameStatusEnum.UN_START.getStatus().equals(gameStatus)) {
                            startGame(game);
                        }else {
                            log.debug("game start failed. startTime changed or deleted or is not un-start. game: {}", game);
                        }
                    } else if (action.equals(ACTION_FINISH)) {
                        log.debug("game {} is finishing.", gameId);
                        if (game.getDeleted().equals(NOT_DELETED) && GameStatusEnum.STARTED.getStatus()
                                .equals(gameStatus)) {
                            if (GameTypeEnum.TOWER.getStatus().equals(gameType)) {
                                //智慧塔直接判断结束时间是否相等
                                if (timeAndSubCount.equals(dbEndTime)) {
                                    finishGame(game);
                                } else {
                                    log.debug("game had updated. endTime changed. game: {}", game);
                                }
                            } else if (GameTypeEnum.ARENA.getStatus().equals(gameType)) {
                                //如果是头脑竞技场,判断题目数量及开始时间是否相等
                                String[] split = timeAndSubCount.split("&");
                                String cacheStartTime = split[0];
                                String cacheSubCount = split[1];
                                int count = subjectService.count(new QueryWrapper<GmGameSubject>().lambda()
                                        .eq(GmGameSubject::getDeleted, NOT_DELETED)
                                        .eq(GmGameSubject::getGameId, game.getId()));
                                if (Integer.parseInt(cacheSubCount) == count && cacheStartTime.equals(dbStartTime)) {
                                    finishGame(game);
                                } else {
                                    log.debug("game no finish. subject count or startTime changed. game: {}", game);
                                }
                            } else {
                                log.error("game finish failed. game type is not supported. game: {}", game);
                            }
                        } else {
                            log.debug("game deleted or not started, cannot be finished. game: {}", game);
                        }
                    } else {
                        log.error("GameAutoListener action undefined. action: {}", actionAndGameId);
                    }
                } catch (Exception e) {
                    log.error("GameAutoListener failed. actionAndGameId: {} err: {}", actionAndGameId,
                            ExceptionUtils.getStackTrace(e));
                }
            }
        }
    
        /**
         * 设置游戏结束状态
         *
         * @param game 游戏
         */
        private void finishGame(GmGame game) {
            try (RLockSupport locker = RLockSupport.ofLocker(lockService)) {
                locker.acquire(String.format(GAME_AUTO_FINISH_LOCK, game.getId()), INT_2, TimeUnit.SECONDS);
                game.setUpdateTime(LocalDateTime.now());
                game.setUpdator(StringUtils.EMPTY);
                game.setGameStatus(GameStatusEnum.FINISHED.getStatus());
                gameService.updateById(game);
    
                //游戏结束之后,异步给外包发送一个结束消息,失败的时候发mq死信队列
                CompletableFuture.runAsync(() -> gameCommonService.sendFinishToClient(game));
                log.debug("game {} finished.", game.getId());
    
                locker.unlock();
            }
        }
    
        /**
         * 设置游戏开始状态
         *
         * @param game 游戏
         */
        private void startGame(GmGame game) {
            try (RLockSupport locker = RLockSupport.ofLocker(lockService)) {
                locker.acquire(String.format(GAME_AUTO_START_LOCK, game.getId()), INT_2, TimeUnit.SECONDS);
    
                game.setUpdateTime(LocalDateTime.now());
                game.setUpdator(StringUtils.EMPTY);
                game.setGameStatus(GameStatusEnum.STARTED.getStatus());
                gameService.updateById(game);
                log.debug("game {} started.", game.getId());
    
                locker.unlock();
            }
        }
    }

    游戏发布的时候,设置一个发布标志缓存,只有发布次数为1的游戏,才允许自动开始:

    /**
     * 设置发布次数
     *  @param game 游戏
     * @param operateEnum 操作枚举
     */
    private void changePublishCount(GmGame game, GameOperateEnum operateEnum) {
        //这里controller层添加了针对用户和game的防重复提交锁,就不需要再次加分布式锁了
        RBucket<Integer> bucket = redissonClient
                .getBucket(String.format(GmcenterRedisKeys.GAME_CANCEL_COUNT_PREFIX, game.getId()));
    
        if (GameOperateEnum.PUBLISH.equals(operateEnum)) {
            Integer count = bucket.get();
            if (count == null) {
                bucket.set(INT_1);
            } else {
                bucket.set(count + INT_1);
            }
        } else if (GameOperateEnum.CANCEL.equals(operateEnum)) {
            Integer count = bucket.get();
            if (count != null) {
                bucket.set(count - INT_1);
            }
        } else if (GameOperateEnum.FINISH.equals(operateEnum)) {
            bucket.delete();
        }
    }

    在游戏取消的时候,需要让发布次数缓存-1,平衡取消之后的再次发布:

    @Transactional(rollbackFor = Exception.class)
    public Long gameCancel(Long gameId) {
        GmGame game = getById(gameId);
        gameCommonService.checkGameCancelStatus(game);
    
        game.setGameStatus(GameStatusEnum.UN_PUBLISH.getStatus());
        gameCommonService.setUpdateField(game);
    
        updateById(game);
    
        changePublishCount(game, GameOperateEnum.CANCEL);
    
        return game.getId();
    }

    当游戏被手动结束的时候,也需要删除游戏对应的计次buket:

    @Transactional(rollbackFor = Exception.class)
    public Long finishGame(Long gameId) {
        GmGame game = getById(gameId);
        gameCommonService.checkGameFinishStatus(game);
    
        game.setGameStatus(FINISHED.getStatus());
        gameCommonService.setUpdateField(game);
    
        updateById(game);
    
        changePublishCount(game, GameOperateEnum.FINISH);
        gameCommonService.sendFinishToClient(game);
    
        return game.getId();
    }
    起风了,努力生存
  • 相关阅读:
    mysql保存中文乱码的原因和解决办法
    NetSetMan IP地址切换工具
    使用批处理文件,自动设置计算机IP地址
    神逸之作:国产快速启动软件神品ALTRun
    Apache详细介绍
    利用sqoop对mysql执行DML操作
    Mysql定时清空表
    azkaban group分组,权限
    azkaban使用
    sqoop无法导出parquet文件到mysql
  • 原文地址:https://www.cnblogs.com/StivenYang/p/15560397.html
Copyright © 2011-2022 走看看