zoukankan      html  css  js  c++  java
  • 来!做一个分钟级业务监控系统【实战】

      如何做一个实时的业务统计的监控?比如分钟级?也就是每分钟可以快速看到业务的变化趋势,及可以做一些简单的分组查询?

      哎,你可能说很简单了,直接从数据库 count 就可以了! 你是对的。

      但如果不允许你使用db进行count呢?因为线上数据库资源可是很宝贵的哦,你这一count可能会给db带来灾难了。

    那不然咋整?

    没有db,我们还有其他数据源嘛,比如: 消息队列?埋点数据? 本文将是基于该前提而行。

      

    做监控,尽量不要侵入业务太多!所以有一个消息中间件是至关重要的。针对大数据系统,一般是: kafka 或者 类kafka. (如本文基础 loghub)

      有了消息中间件,如何进行分钟级监控? 这个应该就很简单了吧。不过如果要自己实现,其实坑也不少的!

    如果自己实现计数,那么你可能需要做以下几件事: 

      1. 每消费一个消息,你需要一个累加器;
      2. 每隔一个周期,你可能需要一个归档操作;
      3. 你可能需要考虑各种并发安全问题;
      4. 你可能需要考虑种性能问题;
      5. 你可能需要考虑各种机器故障问题;
      6. 你可能需要考虑各种边界值问题;

      哎,其实没那么难。时间序列数据库,就专门为这类事情而生!如OpenTSDB: http://opentsdb.net/overview.html

      可以说,TSDB 是这类应用场景的杀手锏。或者基于流计算框架: 如flink, 也是很轻松完成的事。但是不是本文的方向,略过!

    本文是基于 loghub 的现有数据,进行分钟级统计后,入库 mysql 中,从而支持随时查询。(因loghub每次查询都是要钱的,所以,不可能直接查询)

      loghub 数据结构如: 2019-07-10 10:01:11,billNo,userId,productCode,...

      由于loghub提供了很多强大的查询统计功能,所以我们可以直接使用了。

      核心功能就是一个统计sql,还是比较简单的。但是需要考虑的点也不少,接下来,将为看官们奉上一个完整的解决方案!

    撸代码去!

    1. 核心统计任务实现类 MinuteBizDataCounterTask

    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.LogContent;
    import com.aliyun.openservices.log.common.LogItem;
    import com.aliyun.openservices.log.common.QueriedLog;
    import com.aliyun.openservices.log.exception.LogException;
    import com.aliyun.openservices.log.response.GetLogsResponse;
    import com.my.service.statistics.StatisticsService;
    import com.my.entity.BizDataStatisticsMin;
    import com.my.model.LoghubQueryCounterOffsetModel;
    import com.my.util.loghub.LogHubProperties;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.math.BigDecimal;
    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 基于loghub 的分钟级 统计任务
     */
    @Component
    @Slf4j
    public class MinuteBizDataCounterTask implements Runnable {
    
        @Resource
        private LogHubProperties logHubProperties;
    
        @Resource
        private StatisticsService statisticsService;
    
        @Resource(name = "defaultOffsetQueryTaskCallback")
        private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback;
    
        /**
         * loghub 客户端
         */
        private volatile Client mClient;
    
        /**
         * 过滤的topic
         */
        private static final String LOGHUB_TOPIC = "topic_test";
    
        /**
         * 单次扫描loghub最大时间 间隔分钟数
         */
        @Value("${loghub.offset.counter.perScanMaxMinutesGap}")
        private Integer perScanMaxMinutesGap;
    
        /**
         * 单次循环最大数
         */
        @Value("${loghub.offset.counter.perScanMaxRecordsLimit}")
        private Integer perScanMaxRecordsLimit;
    
        /**
         * 构造必要实例信息
         */
        public ProposalPolicyBizDataCounterTask() {
    
        }
    
        @Override
        public void run() {
            if(mClient == null) {
                this.mClient = new Client(logHubProperties.getEndpoint(),
                                    logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());
            }
            while (!Thread.interrupted()) {
                try {
                    updateLastMinutePolicyNoCounter();
                    Thread.sleep(60000);
                }
                catch (InterruptedException e) {
                    log.error("【分钟级统计task】, sleep 中断", e);
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    // 注意此处可能有风险,发生异常后将快速死循环
                    log.error("【分钟级统计task】更新异常", e);
                    try {
                        Thread.sleep(10000);
                    }
                    catch (InterruptedException ex) {
                        log.error("【分钟级统计task】异常,且sleep异常", ex);
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    
        /**
         * 更新最近的数据 (分钟级)
         *
         * @throws LogException loghub查询异常时抛出
         */
        private void updateLastMinutePolicyNoCounter() throws LogException {
            updateMinutePolicyNoCounter(null);
        }
    
        /**
         * 更新最近的数据
         */
        public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException {
            // 1. 获取偏移量
            // 2. 根据偏移量,判定是否可以一次性取完,或者多次获取更新
            // 3. 从loghub中设置偏移量,获取统计数据,更新
            // 4. 更新db数据统计值
            // 5. 更新偏移量
            // 6. 等待下一次更新
    
            // 指定offset时,可能为补数据
            final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset);
            initSharedQueryOffset(destOffset, destOffset == specifyOffset);
    
            Integer totalAffectNum = 0;
    
            while (!isScanFinishOnDestination(destOffset)) {
                // 完整扫描一次时间周期
                calcNextSharedQueryOffset(destOffset);
                while (true) {
                    calcNextInnerQueryOffset();
                    ArrayList<QueriedLog> logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset();
                    Integer affectNum = handleMiniOffsetBatchCounter(logs);
                    totalAffectNum += affectNum;
                    log.info("【分钟级统计task】本次更新数据:{}, offset:{}", affectNum, getCurrentSharedQueryOffset());
                    if(!hasMoreDataOffset(logs.size())) {
                        rolloverOffsetAndCommit();
                        break;
                    }
                }
            }
            log.info("【分钟级统计task】本次更新数据,总共:{}, destOffset:{}, curOffset:{}",
                                totalAffectNum, destOffset, getCurrentSharedQueryOffset());
            rolloverOffsetAndCommit();
            return totalAffectNum;
        }
    
        /**
         * 处理一小批的统计数据
         *
         * @param logs 小批统计loghub数据
         * @return 影响行数
         */
        private Integer handleMiniOffsetBatchCounter(ArrayList<QueriedLog> logs) {
            if (logs == null || logs.isEmpty()) {
                return 0;
            }
            List<BizDataStatisticsMin> statisticsMinList = new ArrayList<>();
            for (QueriedLog log1 : logs) {
                LogItem getLogItem = log1.GetLogItem();
                BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem);
                statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC);
                statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount());
                statisticsMinList.add(statisticsMin1);
            }
            return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback());
        }
    
    
        /**
         * 获取共享偏移信息
         *
         * @return 偏移
         */
        private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() {
            return defaultOffsetQueryTaskCallback.getCurrentOffset();
        }
    
        /**
         * 判断本次是否扫描完成
         *
         * @param destOffset 目标偏移
         * @return true:扫描完成, false: 未完成
         */
        private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) {
            return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime();
        }
    
        /**
         * 获取偏移提交回调器
         *
         * @return 回调实例
         */
        private OffsetQueryTaskCallback getCurrentOffsetCallback() {
            return defaultOffsetQueryTaskCallback;
        }
    
        /**
         * 初始化共享的查询偏移变量
         *
         * @param destOffset 目标偏移
         * @param isSpecifyOffset 是否是手动指定的偏移
         */
        private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) {
            // 整分花时间数据
            Integer queryStartTime = destOffset.getStartTime();
            if(queryStartTime % 60 != 0) {
                queryStartTime = queryStartTime / 60 * 60;
            }
            // 将目标扫描时间终点 设置为起点,以备后续迭代
            defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime,
                                                            destOffset.getOffsetStart(), destOffset.getLimit(),
                                                            destOffset.getIsNewStep(), isSpecifyOffset);
            if(defaultOffsetQueryTaskCallback.getIsNewStep()) {
                resetOffsetDefaultSettings();
            }
        }
    
        /**
         * 计算下一次统计偏移时间
         *
         * @param destOffset 目标偏移值
         */
        private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) {
            int perScanMaxSecondsGap = perScanMaxMinutesGap * 60;
            if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) {
                defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
                int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap;
                if(nextExpectEndTime > destOffset.getEndTime()) {
                    nextExpectEndTime = destOffset.getEndTime();
                }
                defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime);
            }
            else {
                defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
                defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime());
            }
            resetOffsetDefaultSettings();
        }
    
        /**
         * 重置偏移默认配置
         */
        private void resetOffsetDefaultSettings() {
            defaultOffsetQueryTaskCallback.setIsNewStep(true);
            defaultOffsetQueryTaskCallback.setOffsetStart(0);
            defaultOffsetQueryTaskCallback.setLimit(0);
        }
    
        /**
         * 计算下一次小偏移,此种情况应对 一次外部偏移未查询完成的情况
         */
        private void calcNextInnerQueryOffset() {
            defaultOffsetQueryTaskCallback.setIsNewStep(false);
            // 第一次计算时,limit 为0, 所以得出的 offsetStart 也是0
            defaultOffsetQueryTaskCallback.setOffsetStart(
                    defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit());
            defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit);
        }
    
        /**
         * 获取当前循环的扫描区间
         *
         * @return 15567563433-1635345099 区间
         */
        private String getCurrentScanTimeDuring() {
            return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime();
        }
    
        /**
         * 从loghub查询每分钟的统计信息
         *
         * @return 查询到的统计信息
         * @throws LogException loghub 异常时抛出
         */
        private ArrayList<QueriedLog> queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException {
            // 先按保单号去重,再进行计数统计
            String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " +
                    " | select count(1) as totalCountMin, " +
                    "split(bizData, ',')[2] as productCode," +
                    "split(bizData, ',')[3] as schemaCode," +
                    "split(bizData, ',')[4] as channelCode," +
                    "substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " +
                    "group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]";
            countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit();
            GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(),
                    defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(),
                    LOGHUB_TOPIC, countSql);
            if(!countResponse.IsCompleted()) {
                log.error("【分钟级统计task】扫描获取到未完整的数据,请速检查原因,offSet:{}", getCurrentSharedQueryOffset());
            }
            return countResponse.GetLogs() == null
                        ? new ArrayList<>()
                        : countResponse.GetLogs();
        }
    
        /**
         * 根据上一次返回的记录数量,判断是否还有更多数据
         *
         * @param lastGotRecordsCount 上次返回的记录数 (数据量大于最大数说明还有未取完数据)
         * @return true: 是还有更多数据应该再循环获取, false: 无更多数据结束本期任务
         */
        private boolean hasMoreDataOffset(int lastGotRecordsCount) {
            return lastGotRecordsCount >= defaultOffsetQueryTaskCallback.getLimit();
        }
    
        /**
         * 加强版的 offset 优先级: 指定偏移 -> 基于缓存的偏移 -> 新生成偏移标识
         *
         * @param specifyOffset 指定偏移(如有)
         * @return 偏移标识
         */
        private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) {
            if(specifyOffset != null) {
                return specifyOffset;
            }
            LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache();
            if(offsetBaseOnCache != null) {
                return offsetBaseOnCache;
            }
            return generateNewOffset();
        }
    
        /**
         * 基于缓存获取一下偏移标识
         *
         * @return 偏移
         */
        private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() {
            LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache();
            if(offsetFromCache == null) {
                return null;
            }
            LocalDateTime now = LocalDateTime.now();
            LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                                                        now.getHour(), now.getMinute());
            // 如果上次仍未内部循环完成,则使用原来的
            if(offsetFromCache.getIsNewStep()) {
                offsetFromCache.setStartTime(offsetFromCache.getEndTime());
                long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
                offsetFromCache.setEndTime((int) endTime);
            }
            return offsetFromCache;
        }
    
        /**
         * 生成新的完整的 偏移标识
         *
         * @return 新偏移
         */
        private LoghubQueryCounterOffsetModel generateNewOffset() {
            LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel();
            LocalDateTime now = LocalDateTime.now();
            LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                    now.getHour(), now.getMinute());
            long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8"));
            long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
            offsetNew.setStartTime((int) startTime);
            offsetNew.setEndTime((int) endTime);
            return offsetNew;
        }
        /**
         * 将日志返回数据 适配到数据库记录中
         *
         * @param logItem 日志详情
         * @return db数据结构对应
         */
        private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) {
            ArrayList<LogContent> logContents = logItem.GetLogContents();
            BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin();
            for (LogContent logContent : logContents) {
                switch (logContent.GetKey()) {
                    case "totalCountMin":
                        statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue()));
                        break;
                    case "productCode":
                        statisticsMin1.setProductCode(logContent.GetValue());
                        break;
                    case "myDateTimeMinute":
                        String signDtMinStr = logContent.GetValue();
                        String[] dateTimeArr = signDtMinStr.split(" ");
                        String countDate = dateTimeArr[0];
                        String[] timeArr = dateTimeArr[1].split(":");
                        String countHour = timeArr[0];
                        String countMin = timeArr[1];
                        statisticsMin1.setCountDate(countDate);
                        statisticsMin1.setCountHour(countHour);
                        statisticsMin1.setCountMin(countMin);
                        break;
                    default:
                        break;
                }
            }
            return statisticsMin1;
        }
    
        /**
         * 重置默认值,同时提交当前 (滚动到下一个偏移点)
         */
        private void rolloverOffsetAndCommit() {
            resetOffsetDefaultSettings();
            commitOffsetSync();
        }
    
        /**
         * 提交偏移量
         *
         */
        private void commitOffsetSync() {
            defaultOffsetQueryTaskCallback.commit();
        }
    
    }

      主要实现逻辑如下:

        1. 每隔一分钟进行一个查询;
        2. 发生异常后,容错继续查询;
        3. 对于一个新统计,默认倒推一天范围进行统计;
        4. 统计时间范围间隔可设置,避免一次查询数量太大,费用太高且查询返回数量有限;
        5. 对于每次小批量查询,支持分布操作,直到取完数据;
        6. 小批量数据完成后,自动提交查询偏移;
        7. 后续查询将基础提交的偏移进行;
        8. 支持断点查询;

    2. 偏移提交管理器 OffsetQueryTaskCallback

      主任务中,只管进行数据统计查询,提交偏移操作由其他类进行;

    /**
     * 普通任务回调接口定义, 考虑到多种类型的统计任务偏移操作方式可能不一,定义一个通用型偏移接口
     *
     */
    public interface OffsetQueryTaskCallback {
    
        /**
         * 回调方法入口, 提交偏移
         */
        public void commit();
    
        /**
         * 设置初始化绑定当前偏移(期间不得改变)
         *
         * @param startTime 偏移开始时间
         * @param endTime 偏移结束时间
         * @param offsetStart 偏移开始值(分页)
         * @param limit 单次取值最大数(分页)
         * @param isNewStep 是否是新的查询
         * @param isSpecifyOffset 是否是指定的偏移
         */
        public void initCurrentOffset(Integer startTime, Integer endTime,
                                      Integer offsetStart, Integer limit,
                                      Boolean isNewStep, Boolean isSpecifyOffset);
    
        /**
         * 从当前环境中获取当前偏移信息
         *
         * @return 偏移变量实例
         */
        public LoghubQueryCounterOffsetModel getCurrentOffset();
    
    }
    
    
    import com.alibaba.fastjson.JSONObject;
    import com.my.util.constants.RedisKeysConstantEnum;
    import com.my.util.redis.RedisPoolUtil;
    import com.my.model.LoghubQueryCounterOffsetModel;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 默认偏移回调实现
     *
     */
    @Component("defaultOffsetQueryTaskCallback")
    @Slf4j
    public class DefaultOffsetQueryTaskCallbackImpl implements OffsetQueryTaskCallback {
    
        @Resource
        private RedisPoolUtil redisPoolUtil;
    
        /**
         * 当前偏移信息
         */
        private ThreadLocal<LoghubQueryCounterOffsetModel> currentOffsetHolder = new ThreadLocal<>();
    
    
        @Override
        public void commit() {
            if(!currentOffsetHolder.get().getIsSpecifyOffset()) {
                redisPoolUtil.set(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey(),
                        JSONObject.toJSONString(currentOffsetHolder.get()));
            }
        }
    
        @Override
        public void initCurrentOffset(Integer startTime, Integer endTime,
                                      Integer offsetStart, Integer limit,
                                      Boolean isNewStep, Boolean isSpecifyOffset) {
            LoghubQueryCounterOffsetModel currentOffset = new LoghubQueryCounterOffsetModel();
            currentOffset.setStartTime(startTime);
            currentOffset.setEndTime(endTime);
            currentOffset.setOffsetStart(offsetStart);
            currentOffset.setIsNewStep(isNewStep);
            currentOffset.setIsSpecifyOffset(isSpecifyOffset);
            currentOffsetHolder.set(currentOffset);
        }
    
        @Override
        public LoghubQueryCounterOffsetModel getCurrentOffset() {
            return currentOffsetHolder.get();
        }
    
        /**
         * 从缓存中获取当前偏移信息
         *
         * @return 缓存偏移或者 null
         */
        public LoghubQueryCounterOffsetModel getCurrentOffsetFromCache() {
            String offsetCacheValue = redisPoolUtil.get(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey());
            if (StringUtils.isBlank(offsetCacheValue)) {
                return null;
            }
            return JSONObject.parseObject(offsetCacheValue, LoghubQueryCounterOffsetModel.class);
        }
    
        public Integer getStartTime() {
            return currentOffsetHolder.get().getStartTime();
        }
    
        public void setStartTime(Integer startTime) {
            currentOffsetHolder.get().setStartTime(startTime);
        }
    
        public Integer getEndTime() {
            return currentOffsetHolder.get().getEndTime();
        }
    
        public void setEndTime(Integer endTime) {
            currentOffsetHolder.get().setEndTime(endTime);
        }
    
        public Integer getOffsetStart() {
            return currentOffsetHolder.get().getOffsetStart();
        }
    
        public void setOffsetStart(Integer offsetStart) {
            currentOffsetHolder.get().setOffsetStart(offsetStart);
        }
    
        public Integer getLimit() {
            return currentOffsetHolder.get().getLimit();
        }
    
        public void setLimit(Integer limit) {
            currentOffsetHolder.get().setLimit(limit);
        }
    
        public Boolean getIsNewStep() {
            return currentOffsetHolder.get().getIsNewStep();
        }
    
        public void setIsNewStep(Boolean isNewStep) {
            currentOffsetHolder.get().setIsNewStep(isNewStep);
        }
    
    }
    
    /**
     * loghub 查询偏移量 数据容器
     *
     */
    @Data
    public class LoghubQueryCounterOffsetModel implements Serializable {
    
        private static final long serialVersionUID = -3749552331349228045L;
    
        /**
         * 开始时间
         */
        private Integer startTime;
    
        /**
         * 结束时间
         */
        private Integer endTime;
    
        /**
         * 起始偏移
         */
        private Integer offsetStart = 0;
    
        /**
         * 每次查询的 条数限制, 都需要进行设置后才可用, 否则查无数据
         */
        private Integer limit = 0;
    
        /**
         * 是否新的偏移循环,如未完成,应继续子循环 limit
         *
         * true: 是, offsetStart,limit 失效, false: 否, 需借助 offsetStart,limit 进行limit相加
         */
        private Boolean isNewStep = true;
    
        /**
         * 是否是手动指定的偏移,如果是说明是在手动被数据,偏移量将不会被更新
         *
         *      此变量是瞬时值,将不会被持久化到偏移标识中
         */
        private transient Boolean isSpecifyOffset;
    
    }

    3. 批量更新统计结果数据库的实现 

      因每次统计的数据量是不确定的,因尽可能早的提交一次统计结果,防止一次提交太多,或者 机器故障时所有统计白费,所以需要分小事务进行。

        
    @Service
    public class StatisticsServiceImpl implements StatisticsService {
        /**
         * 批量更新统计分钟级数据 (事务型提交)
         *
         * @param statisticsMinList 新统计数据
         * @return 影响行数
         */
        @Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Throwable.class)
        public Integer batchUpsertPremiumStatistics(List<BizProposalPolicyStatisticsMin> statisticsMinList,
                OffsetQueryTaskCallback callback) {
            AtomicInteger updateCount = new AtomicInteger(0);
            statisticsMinList.forEach(item -> {
                int affectNum = 0;
                BizProposalPolicyStatisticsMin oldStatistics = bizProposalPolicyStasticsMinMapper.selectOneByCond(item);
                if (oldStatistics == null) {
                    item.setEtlVersion(item.getEtlVersion() + ":0");
                    affectNum = bizProposalPolicyStasticsMinMapper.insert(item);
                } else {
                    oldStatistics.setStatisticsCount(oldStatistics.getStatisticsCount() + item.getStatisticsCount());
                    String versionFull = versionKeeperFilter(oldStatistics.getEtlVersion(), item.getEtlVersion());
                    oldStatistics.setEtlVersion(versionFull + ":" + oldStatistics.getStatisticsCount());
                    // todo: 优化更新版本号问题
                    affectNum = bizProposalPolicyStasticsMinMapper.updateByPrimaryKey(oldStatistics);
                }
                updateCount.addAndGet(affectNum);
            });
            callback.commit();
            return updateCount.get();
        }
    
        /**
         * 版本号过滤器(组装版本信息)
         *
         * @param oldVersion     老版本信息
         * @param currentVersion 当前版本号
         * @return 可用的版本信息
         */
        private String versionKeeperFilter(String oldVersion, String currentVersion) {
            String versionFull = oldVersion + "," + currentVersion;
            if (versionFull.length() >= 500) {
                // 从150以后,第一版本号开始保留
                versionFull = versionFull.substring(versionFull.indexOf(',', 150));
            }
            return versionFull;
        }
    
    }

    4. 你需要一个启动任务的地方

    /**
     * 启动时运行的任务调度服务
     *
     */
    @Service
    @Slf4j
    public class TaskAutoRunScheduleService {
    
        @Resource
        private MinuteBizDataCounterTask minuteBizDataCounterTask;
    
        @PostConstruct
        public void bizDataAutoRun() {
            log.info("============= bizDataAutoRun start =================");
            ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Biz-data-counter-%d"));
            executorService.submit(minuteBizDataCounterTask);
        }
    
    }

    5. 将每分钟的数据从db查询出来展示到页面

      以上将数据统计后以分钟级汇总到数据,接下来,监控页面就只需从db中进行简单聚合就可以了,咱们就不费那精力去展示了。

    6. 待完善的地方

      1. 集群环境的任务运行将会出问题,解决办法是:加一个分布式锁即可。 你可以的!

      2. 针对重试执行统计问题,还得再考虑考虑。(幂等性)

    唠叨: 踩坑不一定是坏事!

  • 相关阅读:
    python中__dict__和dir()
    python学习之copy模块
    python学习之optparse
    python join和split和strip用法
    浅谈 Python 的 with 语句
    Python:itertools模块
    OpenStack Swift client开发
    OpenStack Swift集群部署流程与简单使用
    python bisect模块
    Python中的导入
  • 原文地址:https://www.cnblogs.com/yougewe/p/11220586.html
Copyright © 2011-2022 走看看