zoukankan      html  css  js  c++  java
  • 基于Morphia实现MongoDB按小时、按天聚合操作

    MongoDB按照天数或小时聚合

    需求

    最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
    实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
    涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia.

    数据模型

    @Data
    @Builder
    @Entity(value = "rawDevStatus", noClassnameStored = true)
    // 设备状态索引
    @Indexes({
            // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
            @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
            @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
    })
    public class RawDevStatus {
    
        @Id
        @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
        private ObjectId objectId;
    
        private String userId;
    
        private Instant time;
    
        @Embedded("points")
        List<Point> protocolPoints;
    
        @Data
        @AllArgsConstructor
        public static class Point {
            /**
             * 协议类型
             */
            private Protocol protocol;
    
            /**
             * 设备总数
             */
            private Integer total;
    
            /**
             * 设备在线数目
             */
            private Integer onlineNum;
    
            /**
             * 处于启用状态设备数目
             */
            private Integer enableNum;
        }
    }
    

    上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

    @Data
    @Builder
    @Entity(value = "aggregationDevStatus", noClassnameStored = true)
    @Indexes({
            @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
            @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
    })
    public class AggregationDevStatus {
    
        @Id
        @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
        private ObjectId objectId;
    
        /**
         * 用户ID
         */
        private String userId;
    
        /**
         * 设备总数
         */
        private Double total;
    
        /**
         * 设备在线数目
         */
        private Double onlineNum;
    
        /**
         * 处于启用状态设备数目
         */
        private Double enableNum;
    
        /**
         * 聚合类型(按照小时还是按照天聚合)
         */
        @Property("aggDuration")
        private AggregationDuration aggregationDuration;
    
        private Instant time;
    
        /**
         * 动态设置文档过期时间
         */
        private Instant expireAt;
    }
    

    上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

    聚合操作符介绍

    聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.
    此次聚合主要涉及以下操作:

    • $project:指定输出文档中的字段.
    • $unwind:拆分数据中的数组;
    • match:选择要处理的文档数据;
    • group:根据key分组聚合结果.

    原始聚合语句

    db.getCollection('raw_dev_status').aggregate([
        {$match:
            {
                time:{$gte: ISODate("2019-06-27T00:00:00Z")},
            }
        },
        {$unwind: "$points"},
        {$project:
            {
                userId:1,points:1,
                tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
            }
        },
        {$project:
            {
                userId:1,points:1,
                groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
            }
        },
        {$group:
            {
                _id:{user_id:'$userId', cal_time:'$groupTime'},
                devTotal:{'$avg':'$points.total'},
                onlineTotal:{'$avg':'$points.onlineNum'},
                enableTotal:{'$avg':'$points.enableNum'}
            }
        },
    ])
    

    上述代码是按小时聚合数据,以下来逐步介绍处理思路:

    (1) $match

    根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

    (2) $unwind

    raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

    (3) $project

        {$project:
            {
                userId:1,points:1,
                tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
            }
        }
    

    选择需要输出的数据,分别为:userId,points以及tmp.
    需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.

    如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z即可满足要求.

    (4) $project

        {$project:
            {
                userId:1,points:1,
                groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
            }
        }
    

    因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
    因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.

    (5) $group

    对聚合结果进行分类操作,并生成最终输出结果.

        {$group:
            {
                # 根据_id进行分组操作,依据是`user_id`以及`$groupTime`
                _id:{user_id:'$userId', cal_time:'$groupTime'},
                # 求设备总数平均值
                devTotal:{'$avg':'$points.total'},
                # 求设备在线数平均值
                onlineTotal:{'$avg':'$points.onlineNum'},
                # ...
                enableTotal:{'$avg':'$points.enableNum'}
            }
        }
    

    代码编写

    此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.

        /**
         * 创建聚合条件
         *
         * @param pastTime     过去时间段
         * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
         * @return 聚合条件
         */
        private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
            Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
            return datastore.createAggregation(RawDevStatus.class)
                    .match(query.field("time").greaterThanOrEq(pastTime))
                    .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
                    .match(query.field("points.protocol").equal("ALL"))
                    .project(Projection.projection("userId"),
                            Projection.projection("points"),
                            Projection.projection("convertTime",
                                    Projection.expression("$dateToString",
                                            new BasicDBObject("format", dateToString)
                                                    .append("date", "$time"))
                            )
                    )
                    .project(Projection.projection("userId"),
                            Projection.projection("points"),
                            Projection.projection("convertTime",
                                    Projection.expression("$dateFromString",
                                            new BasicDBObject("format", stringToDate)
                                                    .append("dateString", "$convertTime"))
                            )
                    )
                    .group(
                            Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
                            Group.grouping("total", Group.average("points.total")),
                            Group.grouping("onlineNum", Group.average("points.onlineNum")),
                            Group.grouping("enableNum", Group.average("points.enableNum"))
                    );
        }
    
        /**
         * 获取聚合结果
         *
         * @param pipeline 聚合条件
         * @return 聚合结果
         */
        private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
            List<AggregationMidDevStatus> statuses = new ArrayList<>();
            Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
                    AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
            while (resultIterator.hasNext()) {
                statuses.add(resultIterator.next());
            }
            return statuses;
        }
    
        //......................................................................................
        // 获取聚合结果(省略若干代码)
        AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
        List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
        if (CollectionUtils.isEmpty(midStatuses)) {
            log.warn("Can not get dev status aggregation result.");
            return;
        }
    

    PS:
    如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢!
    程序员打怪之路

  • 相关阅读:
    vue自定义指令
    ZOJ Problem Set–2104 Let the Balloon Rise
    ZOJ Problem Set 3202 Secondprice Auction
    ZOJ Problem Set–1879 Jolly Jumpers
    ZOJ Problem Set–2405 Specialized FourDigit Numbers
    ZOJ Problem Set–1874 Primary Arithmetic
    ZOJ Problem Set–1970 All in All
    ZOJ Problem Set–1828 Fibonacci Numbers
    要怎么样调整状态呢
    ZOJ Problem Set–1951 Goldbach's Conjecture
  • 原文地址:https://www.cnblogs.com/jason1990/p/11269658.html
Copyright © 2011-2022 走看看