zoukankan      html  css  js  c++  java
  • Flink+Druid构建实时OLAP的探索

    场景

    k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。

    方案对比

    对比了很多解决方案,如下几种,列出来供参考。

    方案实时入库SQL支持度
    Spark+CarbonData 支持 Spark SQL语法丰富
    Kylin 不支持 支持join
    Flink+Druid 支持 0.15以前不支持SQL,不支持join
    1. 上一篇文章所示,使用Spark+CarbonData也是一种解决方案,但是他的缺点也是比较明显,如不能和Flink进行结合,因为我们整个的大数据规划的大致方向是,Spark用来作为离线计算,Flink作为实时计算,并且这两个大方向短时间内不会改变;
    2. Kylin一直是老牌OLAP引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
    3. 使用Flink+Druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,Flink现在如日中天,各大厂都在使用,Druid是OLAP的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下这篇文章,我的博客其它文章也有最新版本的安装教程,实操方案哦。

    设计方案

    实时处理采用Flink SQL,实时入库Druid方式采用 druid-kafka-indexing-service,另一种方式入库方式,Tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。

    场景举例

    实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段

    flink的处理

    将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下

    sysTime,DateTime格式
    pt,格式yyyy-MM-dd
    eventId,事件类型(enterRoom|disconnect)
    lessonId,课程ID
    
    Druid处理

    启动Druid Supervisor,消费Kafka里的数据,使用预聚合,配置如下

    {
      "type": "kafka",
      "dataSchema": {
        "dataSource": "sac_core_analyze_v1",
        "parser": {
          "parseSpec": {
            "dimensionsSpec": {
              "spatialDimensions": [],
              "dimensions": [
                "eventId",
                "pt"
              ]
            },
            "format": "json",
            "timestampSpec": {
              "column": "sysTime",
              "format": "auto"
            }
          },
          "type": "string"
        },
        "metricsSpec": [
          {
                "filter": {
                    "type": "selector",
                    "dimension": "msg_type",
                    "value": "disconnect"
                },
                "aggregator": {
                    "name": "lesson_offline_molecule_id",
                    "type": "cardinality",
                    "fields": ["lesson_id"]
                },
                "type": "filtered"
            }, {
                "filter": {
                    "type": "selector",
                    "dimension": "msg_type",
                    "value": "enterRoom"
                },
                "aggregator": {
                    "name": "lesson_offline_denominator_id",
                    "type": "cardinality",
                    "fields": ["lesson_id"]
                },
                "type": "filtered"
            }
        ],
        "granularitySpec": {
          "type": "uniform",
          "segmentGranularity": "DAY",
          "queryGranularity": {
            "type": "none"
          },
          "rollup": true,
          "intervals": null
        },
        "transformSpec": {
          "filter": null,
          "transforms": []
        }
      },
      "tuningConfig": {
        "type": "kafka",
        "maxRowsInMemory": 1000000,
        "maxBytesInMemory": 0,
        "maxRowsPerSegment": 5000000,
        "maxTotalRows": null,
        "intermediatePersistPeriod": "PT10M",
        "basePersistDirectory": "/tmp/1564535441619-2",
        "maxPendingPersists": 0,
        "indexSpec": {
          "bitmap": {
            "type": "concise"
          },
          "dimensionCompression": "lz4",
          "metricCompression": "lz4",
          "longEncoding": "longs"
        },
        "buildV9Directly": true,
        "reportParseExceptions": false,
        "handoffConditionTimeout": 0,
        "resetOffsetAutomatically": false,
        "segmentWriteOutMediumFactory": null,
        "workerThreads": null,
        "chatThreads": null,
        "chatRetries": 8,
        "httpTimeout": "PT10S",
        "shutdownTimeout": "PT80S",
        "offsetFetchPeriod": "PT30S",
        "intermediateHandoffPeriod": "P2147483647D",
        "logParseExceptions": false,
        "maxParseExceptions": 2147483647,
        "maxSavedParseExceptions": 0,
        "skipSequenceNumberAvailabilityCheck": false
      },
      "ioConfig": {
        "topic": "sac_druid_analyze_v2",
        "replicas": 2,
        "taskCount": 1,
        "taskDuration": "PT600S",
        "consumerProperties": {
          "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
        },
        "pollTimeout": 100,
        "startDelay": "PT5S",
        "period": "PT30S",
        "useEarliestOffset": false,
        "completionTimeout": "PT1200S",
        "lateMessageRejectionPeriod": null,
        "earlyMessageRejectionPeriod": null,
        "stream": "sac_druid_analyze_v2",
        "useEarliestSequenceNumber": false
      },
      "context": null,
      "suspended": false
    }
    View Code

    最重要的配置是metricsSpec,他主要定义了预聚合的字段和条件。

    数据查询

    数据格式如下

    pteventIdlesson_offline_molecule_idlesson_offline_denominator_id
    2019-08-09 enterRoom "AQAAAAAAAA==" "AQAAAAAAAA=="
    2019-08-09 disconnect "AQAAAAAAAA==" "AQAAAAAAAA=="

    结果可以按照这样的SQL出

    SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt
    

    可以使用Druid的接口查询结果,肥肠的方便~

  • 相关阅读:
    LeetCode 123. Best Time to Buy and Sell Stock III (stock problem)
    精帖转载(关于stock problem)
    LeetCode 122. Best Time to Buy and Sell Stock II (stock problem)
    LeetCode 121. Best Time to Buy and Sell Stock (stock problem)
    LeetCode 120. Triangle
    基于docker 搭建Elasticsearch5.6.4 分布式集群
    从零开始构建一个centos+jdk7+tomcat7的docker镜像文件
    Harbor实现容器镜像仓库的管理和运维
    docker中制作自己的JDK+tomcat镜像
    docker镜像制作---jdk7+tomcat7基础镜像
  • 原文地址:https://www.cnblogs.com/ChouYarn/p/11328900.html
Copyright © 2011-2022 走看看