zoukankan      html  css  js  c++  java
  • 轻量级OLAP(二):Hive + Elasticsearch

    1. 引言

    在做OLAP数据分析时,常常会遇到过滤分析需求,比如:除去只有性别、常驻地标签的用户,计算广告媒体上的覆盖UV。OLAP解决方案Kylin不支持复杂数据类型(array、struct、map),要求数据输入Schema必须是平铺的,但是平铺后丢失了用户的聚合标签信息,而没有办法判断某一个用户是否只有性别、常驻地标签。显然,我们需要一种支持复杂数据类型的OLAP数据库;底层为Lucene的Elasticsearch正在向OLAP融合,腾讯内部已经用基于Lucene的分析数据库Hermes来做多维数据分析。

    Elasticsearch(ES)在设计之初是用来做全文检索的搜索引擎,但随着倒排索引所表现出来优秀的查询性能,有越来越多人拿它做分析数据库使。可将ES视作文档型NoSQL数据库,一般情况下将具有相同schema的文档(document)归属于一个type,所有的文档存储于某一个index;ES与RDBMS的概念对比如下:

    Relational DB ⇒ Databases ⇒ Tables ⇒ Rows ⇒ Columns
    Elasticsearch ⇒ Indices ⇒ Types ⇒ Documents ⇒ Fields

    2. 写数据

    广告日志与标签数据均落在Hive表,并且ES官方提供与Hive的集成。因此,我们首选用Hive向ES写数据。首先,采用ES做OLAP分析引擎,创建表如下:

    add jar /path/elasticsearch-hadoop-2.3.1.jar;
    
    create external table ad_tag (
      dvc string, 
      medias array < string >, 
      c1_arr array < string >, 
      week_time string
    ) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' tblproperties(
      'es.nodes' = '<ip1>:9200,<ip2>:9200', 
      'es.resource' = 'ad-{week_time}/tag', 
      'es.mapping.exclude' = 'week_time'
    );
    

    在设计Hive表结构时,ES的计算UV的distinct count(cardinality)存在着计算误差;因此,我们按dvc对其他字段做了聚合,UV的计算转换成了ES doc命中数。其中,es.nodes表示ES的节点,只需配置一个节点即可;es.resource对应于ES的Index/Type;es.mapping.exclude在写ES时不会被索引的字段。因我们只有写操作而没有通过Hive查询ES数据,因此并没有设置es.query。Hive向ES写数据如下:

    set hive.map.aggr = false;
    
    insert overwrite table ad_tag 
    select 
      media, 
      a.dvc as dvc, 
      case when c1_arr is null then array('empty') else c1_arr end as c1_arr, 
      '2016-10-08' as week_time 
    from 
      (
        select 
          dvc, 
          app_name as media 
        from 
          ad_log 
        where 
          is_exposure = '1' 
          and day_time between date_sub('2016-10-08', 6) 
          and '2016-10-08' 
        group by 
          dvc, 
          app_name
      ) a 
      left outer join (
        select 
          dvc, 
          collect_set(c1) as c1_arr 
        from 
          tag lateral view inline(tag) in_tb 
        where 
          day_time = '2016-10-08' 
        group by 
          dvc
      ) b on a.dvc = b.dvc;
    

    在写ES时,在构建索引时不需要分词,通过PUT index template方式实现之:

    {
      "template": "ad*",
      "mappings": {
        "_default_": {
          "dynamic_templates": [
            {
              "string_template": {
                "mapping": {
                  "include_in_all": false,
                  "index": "not_analyzed",
                  "type": "string",
                  "index_options": "docs"
                },
                "match": "*"
              }
            }
          ]
        }
      }
    }
    

    3. 多维分析

    ES官方的查询语言是DSL,主要分为两类:

    • Query,相当于SQL中的where部分,可套用filter、match等;
    • Aggregation,相当于SQL中的group by部分,在aggs内部也可以套用filter。

    DSL可以嵌套,表达异常复杂的查询操作;但是,若以字符串拼接的方式实现DSL,则显得可维护性太差。因此,官方提供了elasticsearch-dsl-py,可以将DSL等同于一段Python代码。我们的多维分析器便是基于此实现的(Python 3.5 + elasticsearch_dsl 2.1.0)

    整体上曝光UV、有标签的UV、除去常用标签UV,以及每一个媒体上曝光UV、有标签的UV、除去常用标签UV的分析(相当于group by media with cube):

    client = Elasticsearch(['<host1>'], port=20009, timeout=50)
    
    
    def per_media(index_name):
        """count(distinct dvc) group by media with cube"""
        ms = MultiSearch(using=client, index=index_name)
        all_doc = Search()
        all_doc.aggs.bucket('per_media', 'terms', field='medias', size=1000)
        tagged = Search().query('filtered', filter=~Q('term', c1_arr='empty'))
        tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000)
        useful = Search().query('filtered', filter=~Q('term', c1_arr='empty') & Q('script',
                                                                                  script="""['常驻地', '性别'].intersect(doc['c1_arr'].values).size() < doc['c1_arr'].values.size()"""))
        useful.aggs.bucket('per_media', 'terms', field='medias', size=1000)
        ms = ms.add(all_doc)
        ms = ms.add(tagged)
        ms = ms.add(useful)
        responses = ms.execute()
        result_list = []
        result_dict = defaultdict(lambda: [])
        for resp in responses:  # get per media uv(all, tagged, useful_tagged)
            print("Query %d: %r." % (responses.index(resp), resp.search.to_dict()))
            result_list.append(resp.hits.total)
            for buck in resp.aggregations['per_media']['buckets']:
                result_dict[buck['key']].append(buck['doc_count'])
        for k, v in result_dict.items():  # fill up default value 0
            if len(v) < 3:
                result_dict[k] = v + [0] * (3 - len(v))
        return result_list, result_dict
    

    媒体与标签组合维度下的UV统计:

    def per_media_c1(index_name):
        """return {(media, c1) -> tagged_uv}"""
        s = Search(using=client, index=index_name)
        tagged = s.query('filtered', filter=~Q('term', c1_arr='empty'))
        tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000) 
            .bucket('per_c1', 'terms', field='c1_arr', size=100)
        result = {}
        response = tagged.execute()
        for buck in response.aggregations['per_media']['buckets']:
            key = buck['key']
            for b in buck['per_c1']['buckets']:
                result[(key, b['key'])] = b['doc_count']
        return result
    
  • 相关阅读:
    zabbix-agent报错记录
    远程执行命令恢复
    触发器例子
    自定义监控项
    监控项更新间隔
    python paramiko登陆设备
    python爬取某站磁力链
    python网络编程
    并发爬取网站图片
    Pandas Series和DataFrame的基本概念
  • 原文地址:https://www.cnblogs.com/en-heng/p/5943703.html
Copyright © 2011-2022 走看看