zoukankan      html  css  js  c++  java
  • ElasticSearch聚合分析

    聚合用于分析查询结果集的统计指标,我们以观看日志分析为例,介绍各种常用的ElasticSearch聚合操作。

    目录:

    首先展示一下我们要分析的文档结构:

    {
        "video_id": 1289643545120062253, // 视频id
        "video_uid": 3931482202390368051, // 视频发布者id
        "uid": 47381776787453866, // 观看用户id
        "time": 1533891263224, // 时间发生时间
        "watch_duration": 30 // 观看时长
    }
    

    每个文档记录了一个观看事件,我们通过聚合分析用户的观看行为。

    ElasticSearch引入了两个相关概念:

    • 桶(Buckets): 满足特定条件的文档的集合
    • 指标(Metrics): 桶中文档的统计值,如特定字段的平均值

    查询用户观看视频数和观看时长

    首先用sql语句描述这个查询:

    SELECT uid, count(*) as view_count
    FROM view_log
    WHERE time >= #{since} AND time <= #{to} 
    GROUP BY uid;
    

    ES 查询:

    GET /view_log/_search
    {
       "size" : 0,
       "query": {
           "range": {
               "time": {
                   "gte": 0, // since
                   "lte": 0 // to
               }
           }
       },
       "aggs": {
          "agg": { // agg为聚合的名称
            "terms": { // 聚合的条件为 uid 相同
              "field": "uid"
            }
          }
       }
    }
    

    response:

    {
      "took": 10,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 100000,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "agg": {
          "buckets": [
            {
              "key": 21836334489858688,
              "doc_count": 4026
            },
            {
              "key": 31489302390368051,
              "doc_count": 2717
            }
          ]
        }
    }
    

    result.aggregations.agg.buckets列表中包含了查询的结果。

    因为我们按照terms:uid进行聚合,每个bucket为uid相同的文档集合,key字段即为uid。

    doc_count 字段表明bucket中文档的数目即sql语句中的count(*) as view_count

    我们可以为查询添加额外的统计指标, sql描述:

    SELECT uid, count(*) as view_count, avg(watch_duration) as avg_duration 
    FROM view_log
    WHERE time >= #{since} AND time <= #{to} 
    GROUP BY uid;
    

    ES 查询:

    GET /view_log/_search
    {
       "size" : 0,
       "query": {
           "range": {
               "time": {
                   "gte": 0, // since
                   "lte": 0 // to
               }
           }
       },
       "aggs": {
          "agg": { // agg为聚合的名称
            "terms": { // 聚合的条件为 uid 相同
              "field": "uid"
            },
            "aggs": { // 添加统计指标(Metrics)
              "avg_duration": { 
                  "avg": { // 统计 watch_duration 的平均值
                    "field": "watch_duration" 
                  }
              }
            }
          }
       }
    }
    

    response:

    {
      "took": 10,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 100000,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "agg": {
          "buckets": [
            {
              "key": 21836334489858688,
              "doc_count": 4026,
              "avg_duration": {
                "value": 12778.882352941177
              }
            },
            {
              "key": 31489302390368051,
              "doc_count": 2717,
              "avg_duration": {
                "value": 2652.5714285714284
              }
            }
          ]
        }
    }
    

    avg_duration.value 表示 watch_duration 的平均值即该用户的平均观看时长。

    聚合分页器

    在实际应用中用户的数量非常惊人, 不可能通过一次查询得到全部结果因此我们需要分页器分批取回:

    GET /view_log/_search
    {
       "size" : 0,
       "query": {
           "range": {
               "time": {
                   "gte": 0, // since
                   "lte": 0 // to
               }
           }
       },
       "aggs": {
          "agg": { 
            "terms": { 
                "field": "uid",
                "size": 10000, // bucket 的最大个数
                "include": { // 将聚合结果分为10页,序号为[0,9], 取第一页
                    "partition": 0,
                    "num_partitions": 10 
                }
            },
            "aggs": { 
              "avg_duration": { 
                  "avg": { 
                    "field": "watch_duration" 
                  }
              }
            }
          }
       }
    }
    

    上述查询与上节的查询几乎完全相同,只是在aggs.agg.terms字段中添加了include字段进行分页。

    查询视频uv

    单个视频uv

    uv是指观看一个视频的用户数(unique visit),与此相对没有按照用户去重的观看数称为pv(page visit)。

    用SQL语句来描述:

    SELECT video_id, count(*) as pv, count(distinct uid) as uv
    FROM view_log
    WHERE video_id = #{video_id};
    

    ElasticSearch可以方便的进行count(distinct)查询:

    GET /view_log/_search
    {
        "aggs": {
          "uv": {
            "cardinality": {
              "field": "uid"
            }
          }
       }
    }
    

    response:

    {
      "took": 255,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 17579,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "uv": {
          "value": 11
        }
      }
    }
    

    批量查询视频uv

    ElasticSearch也可以批量查询count(distinct), 先用SQL进行描述:

    SELECT video_id, count(*) as pv, count(distinct uid) as uv
    FROM view_log
    GROUP BY video_id;
    

    查询:

    GET /view_log/_search
    {
        "size": 0,
        "aggs": {
          "video": {
            "terms": {
              "field": "video_id"
            },
            "aggs": {
              "uv": {
                  "cardinality": {
                    "field": "uid"
                  }
              }
            }
          }
       }
    }
    

    response:

    {
      "took": 313,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 16940,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "video": {
          "buckets": [
            {
              "key": 25417499722062, // 视频id
              "doc_count": 427, // 视频观看次数 pv
              "uv": {
                "value": 124 // 观看视频的用户数 uv
              }
            },
            {
              "key": 72446898144,
              "doc_count": 744,
              "uv": {
                "value":233
              }
            }
          ]
        }
      }
    }
    

    Having查询

    SQL可以使用HAVING语句根据聚合结果进行过滤,ElasticSearch可以使用pipeline aggregations达到此效果不过语法较为繁琐。

    根据 count 进行过滤

    使用SQL查询观看超过200次的视频:

    SELECT video_id, count(*) as view_count
    FROM view_log
    GROUP BY video_id
    HAVING count(*) > 200;
    
    GET /view_log/_search
    {
      "size": 0,
      "aggs": {
        "view_count": {
          "terms": {
            "field": "video_id"
          },
          "aggs": {
            "having": {
              "bucket_selector": {
                "buckets_path": { // 选择 view_count 聚合的 doc_count 进行过滤
                  "view_count": "_count"
                },
                "script": {
                  "source": "params.view_count > 200"
                }
              }
            }
          }
        }
      }
    }
    

    response:

    {
      "took": 83,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 775,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "view_count": {
          "buckets": [
            {
              "key": 35025417499764062,
              "doc_count": 529
            },
            {
              "key": 19913672446898144,
              "doc_count": 759
            }
          ]
        }
      }
    }
    

    ElasticSearch实现类似HAVING查询的关键在于使用bucket_selector选择聚合结果进行过滤。

    根据其它指标进行过滤

    接下来我们尝试查询平均观看时长大于5分钟的视频, 用SQL描述该查询:

    SELECT video_id FROM view_log
    GROUP BY video_id
    HAVING avg(watch_duration) > 300;
    
    GET /view_log/_search
    {
      "size": 0,
      "aggs": {
        "video": {
          "terms": {
            "field": "video_id"
          },
          "aggs": {
            "avg_duration": {
              "avg": {
                "field": "watch_duration"
              } 
            },
            "avg_duration_filter": {
              "bucket_selector": {
                "buckets_path": {
                  "avg_duration": "avg_duration"
                  },
                  "script": {
                    "source": "params.avg_duration > 200"
                  }
              }  
            }
          }
        }
      }
    }
    

    response:

    {
      "took": 137,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 255,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "video": {
          "buckets": [
            {
              "key": 5417499764062,
              "doc_count": 91576,
              "avg_duration": {
                "value": 103
              }
            },
            {
              "key": 19913672446898144,
              "doc_count": 15771,
              "avg_duration": {
                "value": 197
              }
            }
          ]
        }
      }
    }
    
  • 相关阅读:
    Android读写SD卡
    如何用c语言调用c++做成的动态链接库
    css3 翻转和旋转的区别
    若干道Swift面试题
    可控制导航下拉方向的jQuery下拉菜单代码
    Mysql主从备份和SQL语句的备份
    .net 读书笔记
    .NET框架体系结构
    原则干货存起来
    【转】php和java之间rsa加密互通
  • 原文地址:https://www.cnblogs.com/Finley/p/9499534.html
Copyright © 2011-2022 走看看