zoukankan      html  css  js  c++  java
  • Flink+Druid构建实时OLAP的探索

    场景

    k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。

    方案对比

    对比了很多解决方案,如下几种,列出来供参考。

    方案实时入库SQL支持度
    Spark+CarbonData 支持 Spark SQL语法丰富
    Kylin 不支持 支持join
    Flink+Druid 支持 0.15以前不支持SQL,不支持join
    1. 上一篇文章所示,使用Spark+CarbonData也是一种解决方案,但是他的缺点也是比较明显,如不能和Flink进行结合,因为我们整个的大数据规划的大致方向是,Spark用来作为离线计算,Flink作为实时计算,并且这两个大方向短时间内不会改变;
    2. Kylin一直是老牌OLAP引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
    3. 使用Flink+Druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,Flink现在如日中天,各大厂都在使用,Druid是OLAP的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下这篇文章,我的博客其它文章也有最新版本的安装教程,实操方案哦。

    设计方案

    实时处理采用Flink SQL,实时入库Druid方式采用 druid-kafka-indexing-service,另一种方式入库方式,Tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。

    场景举例

    实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段

    flink的处理

    将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下

    sysTime,DateTime格式
    pt,格式yyyy-MM-dd
    eventId,事件类型(enterRoom|disconnect)
    lessonId,课程ID
    
    Druid处理

    启动Druid Supervisor,消费Kafka里的数据,使用预聚合,配置如下

    {
      "type": "kafka",
      "dataSchema": {
        "dataSource": "sac_core_analyze_v1",
        "parser": {
          "parseSpec": {
            "dimensionsSpec": {
              "spatialDimensions": [],
              "dimensions": [
                "eventId",
                "pt"
              ]
            },
            "format": "json",
            "timestampSpec": {
              "column": "sysTime",
              "format": "auto"
            }
          },
          "type": "string"
        },
        "metricsSpec": [
          {
                "filter": {
                    "type": "selector",
                    "dimension": "msg_type",
                    "value": "disconnect"
                },
                "aggregator": {
                    "name": "lesson_offline_molecule_id",
                    "type": "cardinality",
                    "fields": ["lesson_id"]
                },
                "type": "filtered"
            }, {
                "filter": {
                    "type": "selector",
                    "dimension": "msg_type",
                    "value": "enterRoom"
                },
                "aggregator": {
                    "name": "lesson_offline_denominator_id",
                    "type": "cardinality",
                    "fields": ["lesson_id"]
                },
                "type": "filtered"
            }
        ],
        "granularitySpec": {
          "type": "uniform",
          "segmentGranularity": "DAY",
          "queryGranularity": {
            "type": "none"
          },
          "rollup": true,
          "intervals": null
        },
        "transformSpec": {
          "filter": null,
          "transforms": []
        }
      },
      "tuningConfig": {
        "type": "kafka",
        "maxRowsInMemory": 1000000,
        "maxBytesInMemory": 0,
        "maxRowsPerSegment": 5000000,
        "maxTotalRows": null,
        "intermediatePersistPeriod": "PT10M",
        "basePersistDirectory": "/tmp/1564535441619-2",
        "maxPendingPersists": 0,
        "indexSpec": {
          "bitmap": {
            "type": "concise"
          },
          "dimensionCompression": "lz4",
          "metricCompression": "lz4",
          "longEncoding": "longs"
        },
        "buildV9Directly": true,
        "reportParseExceptions": false,
        "handoffConditionTimeout": 0,
        "resetOffsetAutomatically": false,
        "segmentWriteOutMediumFactory": null,
        "workerThreads": null,
        "chatThreads": null,
        "chatRetries": 8,
        "httpTimeout": "PT10S",
        "shutdownTimeout": "PT80S",
        "offsetFetchPeriod": "PT30S",
        "intermediateHandoffPeriod": "P2147483647D",
        "logParseExceptions": false,
        "maxParseExceptions": 2147483647,
        "maxSavedParseExceptions": 0,
        "skipSequenceNumberAvailabilityCheck": false
      },
      "ioConfig": {
        "topic": "sac_druid_analyze_v2",
        "replicas": 2,
        "taskCount": 1,
        "taskDuration": "PT600S",
        "consumerProperties": {
          "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
        },
        "pollTimeout": 100,
        "startDelay": "PT5S",
        "period": "PT30S",
        "useEarliestOffset": false,
        "completionTimeout": "PT1200S",
        "lateMessageRejectionPeriod": null,
        "earlyMessageRejectionPeriod": null,
        "stream": "sac_druid_analyze_v2",
        "useEarliestSequenceNumber": false
      },
      "context": null,
      "suspended": false
    }
    View Code

    最重要的配置是metricsSpec,他主要定义了预聚合的字段和条件。

    数据查询

    数据格式如下

    pteventIdlesson_offline_molecule_idlesson_offline_denominator_id
    2019-08-09 enterRoom "AQAAAAAAAA==" "AQAAAAAAAA=="
    2019-08-09 disconnect "AQAAAAAAAA==" "AQAAAAAAAA=="

    结果可以按照这样的SQL出

    SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt
    

    可以使用Druid的接口查询结果,肥肠的方便~

  • 相关阅读:
    泛微云桥e-Bridge 目录遍历,任意文件读取
    (CVE-2020-8209)XenMobile-控制台存在任意文件读取漏洞
    selenium 使用初
    将HTML文件转换为MD文件
    Python对word文档进行操作
    使用java安装jar包出错,提示不是有效的JDK java主目录
    Windows server 2012安装VM tools异常解决办法
    ifconfig 命令,改变主机名,改DNS hosts、关闭selinux firewalld netfilter 、防火墙iptables规则
    iostat iotop 查看硬盘的读写、 free 查看内存的命令 、netstat 命令查看网络、tcpdump 命令
    使用w uptime vmstat top sar nload 等命令查看系统负载
  • 原文地址:https://www.cnblogs.com/ChouYarn/p/11328900.html
Copyright © 2011-2022 走看看