zoukankan      html  css  js  c++  java
  • 在线教育 (实时需求)

    1项目需求架构

    1.1 项目需求概览

      一、数据采集平台搭建

      二、Kafka中间件准备

      三、下游Spark Streaming对接Kafka接收数据,实现vip个数统计、页面之间的跳转率、做题正确率与掌握度、播放时长统计及历史区间统计的实时计算功能。

    1.2 项目框架设计

    1.2.1 技术选型

      一、数据存储:KafkaMySQL

      二、数据处理:Spark

      三、其他组件:Zookeeper

    1.2.2 流程设计

    1.2.3 代码框架

    2章 需求

    2.1环境准备

      在本机三台虚拟机上分别搭建好zookeeperkafka。(注:CDH6.3.2kafka的版本为2.2.1),创建所需topic(如果使用--bootstrap-server hadoop102:9092创建,则消费者的offset保存在kafka中,如果使用--zookeeper hadoop:2181创建,则消费者的offset保存在zk中)

    kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic register_topic
    kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic qz_log
    kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic page_topic
    kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic course_learn

    2.2原始数据格式及kafka对应topic

    2.2.1实时统计注册人数 - register.log

      kafka对应topic:register_topic

      数据格式:

    字段

    字段说明

    1

    用户id

     

    2

    平台id

    1:PC

    2:APP

    3:Other

    3

    创建时间

     

      数据示例:

    # 数据使用/t作为分隔符
    7188    2    2019-07-16 16:01:55
    7189    1    2019-07-16 16:01:55
    7190    1    2019-07-16 16:01:55
    7191    1    2019-07-16 16:01:55
    7192    1    2019-07-16 16:01:55
    7193    3    2019-07-16 16:01:55
    7194    1    2019-07-16 16:01:55
    7195    3    2019-07-16 16:01:55

    2.2.2做题正确率数与知识点掌握度数据格式 - qz.log

      kafka对应topicqz_log

      数据格式:

    字段

    字段说明

    1

    用户id

     

    2

    课程id

     

    3

    知识点id

     

    4

    题目id

     

    5

    是否正确

    0错误

    1正确

    6

    创建时间

     

      数据示例:

    # 数据使用/t作为分隔符
    1006    504    8    7    0    2019-07-12 11:17:45
    1007    505    16    9    1    2019-07-12 11:17:45
    1002    505    29    3    0    2019-07-12 11:17:45
    1006    504    10    5    0    2019-07-12 11:17:45
    1001    502    28    8    0    2019-07-12 11:17:45
    1006    505    27    0    1    2019-07-12 11:17:45
    1004    503    25    3    0    2019-07-12 11:17:45
    1007    504    12    1    0    2019-07-12 11:17:45
    1006    501    7    6    0    2019-07-12 11:17:45

    2.2.3商品页面到订单页,订单页到支付页数据格式 - page.log

      kafka对应topicpage_topic

      数据格式:

    序号

    字段

    字段说明

    1

    app_id

    平台id

    1:PC

    2:APP

    3:Other

    2

    device_id

    平台id

    3

    distinct_id

    唯一标识

    4

    ip

    用户ip地址

    5

    last_event_name

    上一事件名称

    6

    last_page_id

    上一页面id

    7

    next_event_name

    下一事件名称

    8

    next_page_id

    下一页面id

    9

    page_id

    当前页面id

    1:商品课程页

    2:订单页面

    3:支付页面

    10

    server_time

    服务器时间

    11

    uid

    用户id

      数据示例:

    # 数据为json格式
    {"app_id":"2","device_id":"100","distinct_id":"23a6d4a7-6903-46a4-bce2-a8317693da45","event_name":"-","ip":"123.235.113.225","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"0"}
    # json格式化之后
    {
      "app_id": "2",
      "device_id": "100",
      "distinct_id": "23a6d4a7-6903-46a4-bce2-a8317693da45",
      "event_name": "-",
      "ip": "123.235.113.225",
      "last_event_name": "-",
      "last_page_id": "0",
      "next_event_name": "-",
      "next_page_id": "2",
      "page_id": "1",
      "server_time": "-",
      "uid": "0"
    }

    2.2.4实时统计学员播放视频各时长 - course_learn.log

      Kafka对应topiccourse_learn

      数据格式:

    序号

    字段

    字段说明

    1

    biz

    唯一标识

    2

    chapterid

    章节id

    3

    cwareid

    课件id

    4

    edutypeid

    辅导id

    5

    pe

    视频播放结束区间

    6

    ps

    视频播放开始区间

    7

    sourceType

    播放平台

    8

    speed

    播放倍速

    9

    subjectid

    主题id

    10

    te

    视频播放结束时间(时间戳)

    11

    ts

    视频播放开始时间(时间戳)

    12

    uid

    用户id

    13

    videoid

    视频id

      数据示例:

    # 数据为json格式
    {"biz":"34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9","chapterid":"2","cwareid":"2","edutypeid":"1","pe":"56","ps":"24","sourceType":"PC","speed":"2","subjectid":"1","te":"1563352144131","ts":"1563352128131","uid":"219","videoid":"6"}
    # json格式化之后
    {
      "biz": "34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9",
      "chapterid": "2",
      "cwareid": "2",
      "edutypeid": "1",
      "pe": "56",
      "ps": "24",
      "sourceType": "PC",
      "speed": "2",
      "subjectid": "1",
      "te": "1563352144131",
      "ts": "1563352128131",
      "uid": "219",
      "videoid": "6"
    }

    2.3模拟数据采集

      将准备好的log文件使用kafka生产者代码发送信息到对应的topic中。log文件均在资料包的.2.资料1日志数据4_实时-kafka主题数据中)

    数据说明

    日志文件

    Kafka topic

    代码文件

    注册日志数据

    register.log

    register_topic

     

    做题数据

    qz.log

    qz_log

     

    商品页面数据

    page.log

    page_topic

     

    视频播放时长数据

    course_learn.log

    course_learn

     

      注:如果windows下没有安装hadoop环境,先windows配置环境变量。(代码运行时候会寻找环境变量中的HADOOP_HOME,然后找%HADOOP_HOME%/bin/winutils.exe,所以我们不需要下载全部的代码,只需要把bin包配置好,能让系统找到%HADOOP_HOME%/bin/winutils.exe即可)

      该文件为hadoop-3.0.0bin目录压缩包,之前在讲HDFS时也有说过,具体请看:https://www.cnblogs.com/LzMingYueShanPao/p/14649564.html

    2.4 ip解析工具测试

      1)ip解析本地库文件,网盘地址:

      2)测试ip解析工具代码:

    import org.lionsoul.ip2region.{DbConfig, DbSearcher}
    
    object IpTest {
      def main(args: Array[String]): Unit = {
        val ipSearch = new DbSearcher(new DbConfig(), this.getClass.getResource("/ip2region.db").getPath)
        val region = ipSearch.binarySearch("113.88.85.106").getRegion
        println(region)
        val city = region.split("\|")(2)
        println(city)
      }
    }

      3)测试结果:

    2.实时统计注册人员信息

    2.5.1 MySQL建表语句

    CREATE TABLE `offset_manager` (
      `groupid` varchar(50) DEFAULT NULL,
      `topic` varchar(50) DEFAULT NULL,
      `partition` int(11) DEFAULT NULL,
      `untiloffset` mediumtext,
      UNIQUE KEY `offset_unique` (`groupid`,`topic`,`partition`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    2.5.2表结构说明

    表名:

    offset_manager

    主键:

    字段名

    字段说明

    1

    groupid

    Kafka consumergroupid

    2

    topic

    Kafka consumertopic

    3

    partition

    Kafka consumerpartition

    4

    untiloffset

    最新的消费offset(由上面的GTP进行定位)

    2.5.3业务流程说明

      用户使用网站或APP进行注册,后台实时收集数据传输KafkaSpark Streaming进行对接统计,实时统计注册人数。

    2.5.4需求说明

      需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey

      需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据(提示:reduceByKeyAndWindow算子)

      需求3:观察对接数据,尝试进行调优。

    2.6实时统计学员做题正确率与知识点掌握度

    2.6.1 MySQL建表语句

    CREATE TABLE `qz_point_detail` (
      `userid` int(11) DEFAULT NULL,
      `courseid` int(11) DEFAULT NULL,
      `pointid` int(11) DEFAULT NULL,
      `qz_sum` int(11) DEFAULT NULL,
      `qz_count` int(11) DEFAULT NULL,
      `qz_istrue` int(11) DEFAULT NULL,
      `correct_rate` double(4,2) DEFAULT NULL,
      `mastery_rate` double(4,2) DEFAULT NULL,
      `createtime` datetime DEFAULT NULL,
      `updatetime` datetime DEFAULT NULL,
      UNIQUE KEY `qz_point_detail_unique` (`userid`,`courseid`,`pointid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    CREATE TABLE `qz_point_history` (
      `userid` int(11) DEFAULT NULL,
      `courseid` int(11) DEFAULT NULL,
      `pointid` int(11) DEFAULT NULL,
      `questionids` text,
      `createtime` datetime DEFAULT NULL,
      `updatetime` datetime DEFAULT NULL,
      UNIQUE KEY `qz_point_set_unique` (`userid`,`courseid`,`pointid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    2.6.2表结构说明

    表名:

    qz_point_detail

    主键:

    `qz_point_detail_unique` (`userid`,`courseid`,`pointid`)

    字段名

    字段说明

    1

    userid

    用户id

    2

    courseid

    课程id

    3

    pointid

    知识点id

    4

    qz_sum

    做题总数(与历史进行累加)

    5

    qz_count

    当前批次做题个数(去重)

    6

    qz_istrue

    做题正确题目总数(与历史进行累加)

    7

    correct_rate

    正确率

    题目正确率=qz_istrue/qz_count

    8

    mastery_rate

    知识点掌握程度=(题目正确率*题目完成度)

    题目完成度=当前知识点去重完成题目数/当前知识点题目总数10

    9

    createtime

    创建时间

    10

    updatetime

    更新时间

    表名:

    qz_point_history

    主键:

    `qz_point_set_unique` (`userid`,`courseid`,`pointid`)

    字段名

    字段说明

    1

    userid

    用户id

    2

    courseid

    课程id

    3

    pointid

    知识点id

    4

    questionids

    题目id(使用,”作为拼接)

    5

    createtime

    创建时间

    6

    updatetime

    更新时间

    2.6.3业务流程说明

      用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度的计算,将正确率和掌握度存入MySQL中,用户点击交卷后刷新页面能立马(思考:这个更新的速度取决于什么?)看到自己做题的详情。

    2.6.4需求说明

      需求1:要求Spark Streaming保证数据不丢失,每秒100条处理速度,需要手动维护偏移量

      需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数

      需求3:计算知识点正确率正确率计算公式:做题正确总个数/做题总数)保留两位小数

      需求4:计算知识点掌握度,(知识点掌握度=去重后的做题个数/当前知识点总题数(已知10题)*当前知识点的正确率

    2.7实时统计商品页到订单页,订单页到支付页转换率

    2.7.1 MySQL建表语句

    CREATE TABLE `page_jump_rate` (
      `id` INT(11) NOT NULL AUTO_INCREMENT,
      `last_page_id` INT(11) DEFAULT NULL,
      `page_id` INT(11) DEFAULT NULL,
      `next_page_id` INT(11) DEFAULT NULL,
      `num` BIGINT(20) DEFAULT NULL,
      `jump_rate` VARCHAR(10) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `page_jum_rate_unique` (`page_id`)
    ) ENGINE=INNODB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8;
    CREATE TABLE `tmp_city_num_detail` (
      `id` INT(11) NOT NULL AUTO_INCREMENT,
      `province` VARCHAR(10) DEFAULT NULL,
      `num` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `tmp_cityp_num_index_province` (`province`)
    ) ENGINE=INNODB AUTO_INCREMENT=4191 DEFAULT CHARSET=utf8;
    CREATE TABLE `top_city_num` (
      `id` INT(11) NOT NULL AUTO_INCREMENT,
      `province` VARCHAR(10) DEFAULT NULL,
      `num` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=INNODB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

    2.7.2表结构说明

    表名:

    page_jump_rate

    主键:

    `id`

    唯一键:

    `page_jum_rate_unique` (`page_id`)

    字段名

    字段说明

    1

    id

    用户id

    2

    last_page_id

    上一页面id 1:商品课程页 2:订单页面 3:支付页面

    3

    page_id

    当前页面id 1:商品课程页 2:订单页面 3:支付页面

    4

    next_page_id

    下一页面id 1:商品课程页 2:订单页面 3:支付页面

    5

    num

     

    6

    jump_rate

    页面跳转率

    表名:

    tmp_city_num_detail

    主键:

    `id`

    唯一键:

    `tmp_cityp_num_index_province` (`province`)

    字段名

    字段说明

    1

    id

    自增id

    2

    province

    省份

    3

    num

    各省数据统计结果

    表名:

    top_city_num

    主键:

    `id`

    唯一键:

    `page_jum_rate_unique` (`page_id`)

    字段名

    字段说明

    1

    id

    自增id

    2

    province

    省份

    3

    num

    各省数据统计结果(只取前3

    2.7.3业务流程说明

      用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)

    2.7.4需求说明

      需求1:计算首页(商品详情页)总浏览数、订单页总浏览数、支付页面总浏览数

      需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率

      需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加

      注:此处默认首页为商品页,如果当前页为商品页则无需计算转化率,记为100%,为了简化需求,该页面跳转逻辑默认为1号页面跳转至2号页面,2号页面才能跳转3号页面。3号不能跳转回2号和1号。即页面是按序号顺序前进。

    2.8实时统计学员播放视频各时长

    2.8.1 MySQL建表语句

    CREATE TABLE `video_learn_detail` (
      `userid` INT(11) NOT NULL DEFAULT '0',
      `cwareid` INT(11) NOT NULL DEFAULT '0',
      `videoid` INT(11) NOT NULL DEFAULT '0',
      `totaltime` BIGINT(20) DEFAULT NULL,
      `effecttime` BIGINT(20) DEFAULT NULL,
      `completetime` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`userid`,`cwareid`,`videoid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
    
    CREATE TABLE `chapter_learn_detail` (
      `chapterid` INT(11) NOT NULL DEFAULT '0',
      `totaltime` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`chapterid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
    
    CREATE TABLE `cwareid_learn_detail` (
      `cwareid` INT(11) NOT NULL DEFAULT '0',
      `totaltime` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`cwareid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
    
    
    CREATE TABLE `edutype_learn_detail` (
      `edutypeid` INT(11) NOT NULL DEFAULT '0',
      `totaltime` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`edutypeid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
    
    CREATE TABLE `sourcetype_learn_detail` (
      `sourcetype_learn` VARCHAR(10) NOT NULL DEFAULT '',
      `totaltime` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`sourcetype_learn`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
    
    CREATE TABLE `subject_learn_detail` (
      `subjectid` INT(11) NOT NULL DEFAULT '0',
      `totaltime` BIGINT(20) DEFAULT NULL,
      PRIMARY KEY (`subjectid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
    
    CREATE TABLE `video_interval` (
      `userid` INT(11) NOT NULL DEFAULT '0',
      `cwareid` INT(11) NOT NULL DEFAULT '0',
      `videoid` INT(11) NOT NULL DEFAULT '0',
      `play_interval` TEXT,
      PRIMARY KEY (`userid`,`cwareid`,`videoid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;

    2.8.2表结构说明

    表名:

    video_learn_detail

    主键:

    `userid`,`cwareid`,`videoid`

    字段名

    字段说明

    1

    userid

    用户id

    2

    cwareid

    课件id

    3

    videoid

    视频id

    4

    totaltime

    播放总时长:(te-ts)/1000

    5

    effecttime

    有效总时长:[((te-ts)/1000)/(pe-ps)] * complete_duration

    6

    completetime

    完成总时长:(pe-ps)需要对历史数据进行对比并去重

    表名:

    chapter_learn_detail

    主键:

    chapterid

    字段名

    字段说明

    1

    chapterid

    章节id

    2

    totaltime

    统计总时长

    表名:

    cwareid_learn_detail

    主键:

    cwareid

    字段名

    字段说明

    1

    cwareid

    课件id

    2

    totaltime

    统计总时长

    表名:

    edutype_learn_detail

    主键:

    edutypeid

    字段名

    字段说明

    1

    edutypeid

    辅导id

    2

    totaltime

    统计总时长

    表名:

    sourcetype_learn_detail

    主键:

    sourcetype

    字段名

    字段说明

    1

    sourcetype

    播放设备来源类型

    2

    totaltime

    统计总时长

    表名:

    subject_learn_detail

    主键:

    subjectid

    字段名

    字段说明

    1

    subjectid

    主题id

    2

    totaltime

    统计总时长

    表名:

    video_interval

    主键:

    `userid`,`cwareid`,`videoid`

    字段名

    字段说明

    1

    userid

    用户id

    2

    cwareid

    课件id

    3

    videoid

    视频id

    4

    play_interval

    播放历史区间

    2.8.3业务流程说明

      用户在线播放视频进行学习课程,后台记录视频播放开始区间和结束区间,及播放开始时间和播放结束时间,后台手机数据传输kafka需要计算用户播放视频总时长、有效时长、完成时长,及各维度总播放时长。

    2.8.4需求说明

      需求1:计算各章节下的播放总时长(chapterid聚合统计播放总时长)

      需求2:计算各课件下的播放总时长(cwareid聚合统计播放总时长)

      需求3:计算各辅导下的播放总时长(edutypeid聚合统计播放总时长)

      需求4:计算各播放平台下的播放总时长(sourcetype聚合统计播放总时长)

      需求5:计算各科目下的播放总时长(subjectid聚合统计播放总时长)

      需求6:计算用户学习视频的播放总时长、有效时长、完成时长,需求记录视频播历史区间,对于用户多次学习的播放区间不累计有效时长和完成时长。

      播放总时长计算:(te-ts)/1000  向下取整  单位:秒

      完成时长计算:根据pe-ps 计算 需要对历史数据进行去重处理

      有效时长计算:根据te-ts 除以pe-ps 先计算出播放每一区间需要的实际时长 * 完成时长

    3章 思考

      (1Spark Streaming下每个stage的耗时由什么决定

      (2Spark Streaming task发生数据倾斜如何解决

      (3Spark Streaming操作MySQL时,相同维度的数据如何保证线程安全问题

      (4)如何保证kill Spark Streaming任务的时候不丢失数据

      (5)如何保证Spark Streaming的第一次启动和kill后第二次启动时据不丢失数据

      (6Spark Streaming下如何正确操作MySQL(如何正确使用连接)

      (7MySQL建表时 注意索引问题

      (8)代码地址:https://gitee.com/LzMingYueShanPao/online-education.git

  • 相关阅读:
    OpenCV——Skewing
    OpenCV——PS滤镜算法之Spherize 球面化(凸出效果)
    机器学习 scikit-learn 图谱
    机器视觉 Histogram of oriented gradients
    Python: scikit-image canny 边缘检测
    机器视觉 Local Binary Pattern (LBP)
    Ice php配置
    Windows7下的免费虚拟机(微软官方虚拟机)
    经常使用的webservice接口
    怎样衡量一个公司是否靠谱
  • 原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/15156211.html
Copyright © 2011-2022 走看看