zoukankan      html  css  js  c++  java
  • Flink+Druid构建实时OLAP的探索

    场景

    k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。

    方案对比

    对比了很多解决方案,如下几种,列出来供参考。

    方案实时入库SQL支持度
    Spark+CarbonData 支持 Spark SQL语法丰富
    Kylin 不支持 支持join
    Flink+Druid 支持 0.15以前不支持SQL,不支持join
    1. 上一篇文章所示,使用Spark+CarbonData也是一种解决方案,但是他的缺点也是比较明显,如不能和Flink进行结合,因为我们整个的大数据规划的大致方向是,Spark用来作为离线计算,Flink作为实时计算,并且这两个大方向短时间内不会改变;
    2. Kylin一直是老牌OLAP引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
    3. 使用Flink+Druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,Flink现在如日中天,各大厂都在使用,Druid是OLAP的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下这篇文章,我的博客其它文章也有最新版本的安装教程,实操方案哦。

    设计方案

    实时处理采用Flink SQL,实时入库Druid方式采用 druid-kafka-indexing-service,另一种方式入库方式,Tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。

    场景举例

    实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段

    flink的处理

    将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下

    sysTime,DateTime格式
    pt,格式yyyy-MM-dd
    eventId,事件类型(enterRoom|disconnect)
    lessonId,课程ID
    
    Druid处理

    启动Druid Supervisor,消费Kafka里的数据,使用预聚合,配置如下

    {
      "type": "kafka",
      "dataSchema": {
        "dataSource": "sac_core_analyze_v1",
        "parser": {
          "parseSpec": {
            "dimensionsSpec": {
              "spatialDimensions": [],
              "dimensions": [
                "eventId",
                "pt"
              ]
            },
            "format": "json",
            "timestampSpec": {
              "column": "sysTime",
              "format": "auto"
            }
          },
          "type": "string"
        },
        "metricsSpec": [
          {
                "filter": {
                    "type": "selector",
                    "dimension": "msg_type",
                    "value": "disconnect"
                },
                "aggregator": {
                    "name": "lesson_offline_molecule_id",
                    "type": "cardinality",
                    "fields": ["lesson_id"]
                },
                "type": "filtered"
            }, {
                "filter": {
                    "type": "selector",
                    "dimension": "msg_type",
                    "value": "enterRoom"
                },
                "aggregator": {
                    "name": "lesson_offline_denominator_id",
                    "type": "cardinality",
                    "fields": ["lesson_id"]
                },
                "type": "filtered"
            }
        ],
        "granularitySpec": {
          "type": "uniform",
          "segmentGranularity": "DAY",
          "queryGranularity": {
            "type": "none"
          },
          "rollup": true,
          "intervals": null
        },
        "transformSpec": {
          "filter": null,
          "transforms": []
        }
      },
      "tuningConfig": {
        "type": "kafka",
        "maxRowsInMemory": 1000000,
        "maxBytesInMemory": 0,
        "maxRowsPerSegment": 5000000,
        "maxTotalRows": null,
        "intermediatePersistPeriod": "PT10M",
        "basePersistDirectory": "/tmp/1564535441619-2",
        "maxPendingPersists": 0,
        "indexSpec": {
          "bitmap": {
            "type": "concise"
          },
          "dimensionCompression": "lz4",
          "metricCompression": "lz4",
          "longEncoding": "longs"
        },
        "buildV9Directly": true,
        "reportParseExceptions": false,
        "handoffConditionTimeout": 0,
        "resetOffsetAutomatically": false,
        "segmentWriteOutMediumFactory": null,
        "workerThreads": null,
        "chatThreads": null,
        "chatRetries": 8,
        "httpTimeout": "PT10S",
        "shutdownTimeout": "PT80S",
        "offsetFetchPeriod": "PT30S",
        "intermediateHandoffPeriod": "P2147483647D",
        "logParseExceptions": false,
        "maxParseExceptions": 2147483647,
        "maxSavedParseExceptions": 0,
        "skipSequenceNumberAvailabilityCheck": false
      },
      "ioConfig": {
        "topic": "sac_druid_analyze_v2",
        "replicas": 2,
        "taskCount": 1,
        "taskDuration": "PT600S",
        "consumerProperties": {
          "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
        },
        "pollTimeout": 100,
        "startDelay": "PT5S",
        "period": "PT30S",
        "useEarliestOffset": false,
        "completionTimeout": "PT1200S",
        "lateMessageRejectionPeriod": null,
        "earlyMessageRejectionPeriod": null,
        "stream": "sac_druid_analyze_v2",
        "useEarliestSequenceNumber": false
      },
      "context": null,
      "suspended": false
    }
    View Code

    最重要的配置是metricsSpec,他主要定义了预聚合的字段和条件。

    数据查询

    数据格式如下

    pteventIdlesson_offline_molecule_idlesson_offline_denominator_id
    2019-08-09 enterRoom "AQAAAAAAAA==" "AQAAAAAAAA=="
    2019-08-09 disconnect "AQAAAAAAAA==" "AQAAAAAAAA=="

    结果可以按照这样的SQL出

    SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt
    

    可以使用Druid的接口查询结果,肥肠的方便~

  • 相关阅读:
    100个高质量的photoshop画笔
    VC调用DLL库方法的方法
    VC6中使用CHtmlView在对话框控制中显示HTML
    CtrlList 排序问题。
    VC ADO使用说明
    VC右键弹出菜单的实现
    VC6工程项目文件说明
    VC6中用DOM遍历网页中的元素
    C/C++头文件一览
    最常见的20种VC++编译错误信息
  • 原文地址:https://www.cnblogs.com/ChouYarn/p/11328900.html
Copyright © 2011-2022 走看看