zoukankan      html  css  js  c++  java
  • 轻量级OLAP(二):Hive + Elasticsearch

    1. 引言

    在做OLAP数据分析时,常常会遇到过滤分析需求,比如:除去只有性别、常驻地标签的用户,计算广告媒体上的覆盖UV。OLAP解决方案Kylin不支持复杂数据类型(array、struct、map),要求数据输入Schema必须是平铺的,但是平铺后丢失了用户的聚合标签信息,而没有办法判断某一个用户是否只有性别、常驻地标签。显然,我们需要一种支持复杂数据类型的OLAP数据库;底层为Lucene的Elasticsearch正在向OLAP融合,腾讯内部已经用基于Lucene的分析数据库Hermes来做多维数据分析。

    Elasticsearch(ES)在设计之初是用来做全文检索的搜索引擎,但随着倒排索引所表现出来优秀的查询性能,有越来越多人拿它做分析数据库使。可将ES视作文档型NoSQL数据库,一般情况下将具有相同schema的文档(document)归属于一个type,所有的文档存储于某一个index;ES与RDBMS的概念对比如下:

    Relational DB ⇒ Databases ⇒ Tables ⇒ Rows ⇒ Columns
    Elasticsearch ⇒ Indices ⇒ Types ⇒ Documents ⇒ Fields

    2. 写数据

    广告日志与标签数据均落在Hive表,并且ES官方提供与Hive的集成。因此,我们首选用Hive向ES写数据。首先,采用ES做OLAP分析引擎,创建表如下:

    add jar /path/elasticsearch-hadoop-2.3.1.jar;
    
    create external table ad_tag (
      dvc string, 
      medias array < string >, 
      c1_arr array < string >, 
      week_time string
    ) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' tblproperties(
      'es.nodes' = '<ip1>:9200,<ip2>:9200', 
      'es.resource' = 'ad-{week_time}/tag', 
      'es.mapping.exclude' = 'week_time'
    );
    

    在设计Hive表结构时,ES的计算UV的distinct count(cardinality)存在着计算误差;因此,我们按dvc对其他字段做了聚合,UV的计算转换成了ES doc命中数。其中,es.nodes表示ES的节点,只需配置一个节点即可;es.resource对应于ES的Index/Type;es.mapping.exclude在写ES时不会被索引的字段。因我们只有写操作而没有通过Hive查询ES数据,因此并没有设置es.query。Hive向ES写数据如下:

    set hive.map.aggr = false;
    
    insert overwrite table ad_tag 
    select 
      media, 
      a.dvc as dvc, 
      case when c1_arr is null then array('empty') else c1_arr end as c1_arr, 
      '2016-10-08' as week_time 
    from 
      (
        select 
          dvc, 
          app_name as media 
        from 
          ad_log 
        where 
          is_exposure = '1' 
          and day_time between date_sub('2016-10-08', 6) 
          and '2016-10-08' 
        group by 
          dvc, 
          app_name
      ) a 
      left outer join (
        select 
          dvc, 
          collect_set(c1) as c1_arr 
        from 
          tag lateral view inline(tag) in_tb 
        where 
          day_time = '2016-10-08' 
        group by 
          dvc
      ) b on a.dvc = b.dvc;
    

    在写ES时,在构建索引时不需要分词,通过PUT index template方式实现之:

    {
      "template": "ad*",
      "mappings": {
        "_default_": {
          "dynamic_templates": [
            {
              "string_template": {
                "mapping": {
                  "include_in_all": false,
                  "index": "not_analyzed",
                  "type": "string",
                  "index_options": "docs"
                },
                "match": "*"
              }
            }
          ]
        }
      }
    }
    

    3. 多维分析

    ES官方的查询语言是DSL,主要分为两类:

    • Query,相当于SQL中的where部分,可套用filter、match等;
    • Aggregation,相当于SQL中的group by部分,在aggs内部也可以套用filter。

    DSL可以嵌套,表达异常复杂的查询操作;但是,若以字符串拼接的方式实现DSL,则显得可维护性太差。因此,官方提供了elasticsearch-dsl-py,可以将DSL等同于一段Python代码。我们的多维分析器便是基于此实现的(Python 3.5 + elasticsearch_dsl 2.1.0)

    整体上曝光UV、有标签的UV、除去常用标签UV,以及每一个媒体上曝光UV、有标签的UV、除去常用标签UV的分析(相当于group by media with cube):

    client = Elasticsearch(['<host1>'], port=20009, timeout=50)
    
    
    def per_media(index_name):
        """count(distinct dvc) group by media with cube"""
        ms = MultiSearch(using=client, index=index_name)
        all_doc = Search()
        all_doc.aggs.bucket('per_media', 'terms', field='medias', size=1000)
        tagged = Search().query('filtered', filter=~Q('term', c1_arr='empty'))
        tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000)
        useful = Search().query('filtered', filter=~Q('term', c1_arr='empty') & Q('script',
                                                                                  script="""['常驻地', '性别'].intersect(doc['c1_arr'].values).size() < doc['c1_arr'].values.size()"""))
        useful.aggs.bucket('per_media', 'terms', field='medias', size=1000)
        ms = ms.add(all_doc)
        ms = ms.add(tagged)
        ms = ms.add(useful)
        responses = ms.execute()
        result_list = []
        result_dict = defaultdict(lambda: [])
        for resp in responses:  # get per media uv(all, tagged, useful_tagged)
            print("Query %d: %r." % (responses.index(resp), resp.search.to_dict()))
            result_list.append(resp.hits.total)
            for buck in resp.aggregations['per_media']['buckets']:
                result_dict[buck['key']].append(buck['doc_count'])
        for k, v in result_dict.items():  # fill up default value 0
            if len(v) < 3:
                result_dict[k] = v + [0] * (3 - len(v))
        return result_list, result_dict
    

    媒体与标签组合维度下的UV统计:

    def per_media_c1(index_name):
        """return {(media, c1) -> tagged_uv}"""
        s = Search(using=client, index=index_name)
        tagged = s.query('filtered', filter=~Q('term', c1_arr='empty'))
        tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000) 
            .bucket('per_c1', 'terms', field='c1_arr', size=100)
        result = {}
        response = tagged.execute()
        for buck in response.aggregations['per_media']['buckets']:
            key = buck['key']
            for b in buck['per_c1']['buckets']:
                result[(key, b['key'])] = b['doc_count']
        return result
    
  • 相关阅读:
    sparql学习sparql示例、dbpedia在线验证
    中国绿卡
    逾期率的水有多深,你知道吗?
    ICO和区块链区别
    What are the benefits to using anonymous functions instead of named functions for callbacks and parameters in JavaScript event code?
    Link static data in sql source control
    sql data compare
    viewbag
    多态的实际使用
    win10 sedlauncher.exe占用cpu处理
  • 原文地址:https://www.cnblogs.com/en-heng/p/5943703.html
Copyright © 2011-2022 走看看