zoukankan      html  css  js  c++  java
  • 大数据开发实战:Stream SQL实时开发三

      4、聚合操作

       4.1、group by 操作

          group by操作是实际业务场景(如实时报表、实时大屏等)中使用最为频繁的操作。通常实时聚合的主要源头数据流不会包含丰富的上下文信息,而是经常需要实时关联相关

        相关的维度表,并针对这些扩展的、丰富维度属性进行各种业务的统计。

          在下面的实例中,订单流通过买家id关联了买家维度表,获取其所在省份信息,然后实时统计每天各个省份的iPhone销量信息。

          ---从源头接收订单实时流

          create table test_order_stream (

            gmt_create  varchar,

            gmt_modified  varchar,

            order_id  bigint,

            buyer_id  bigint,

            seller_id  bigint,

            item_id  bigint,

            json_object  varchar,

            order_type  varchar,

            category_name  varchar,

            sub_category_name  varchar

          ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = ' your_project',

            topic = 'test_topic_1',

            accessId = 'your_access_id',

            accessKey = 'your_access_key',

            startTime = '2018-08-08-00:00:00'

            );

         ----定义rds买家维度表

         create table  rds_dim_buyer(

          buyer_id  int,

          age  int,

          province  varchar,

          star_level  varchar,

          primary key(buyer_id),

          period for system_time    ---定义了维度表的变化周期,即是一张变化的表

         ) with (

          type = 'rds',

          url = 'your_mySQL_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password'

          );

         ---订单流关联买家维度表获取买家所在省份,并过滤非iPhone订单

         create view tmp_order as

         select ord.order_id,

            ord.gmt_create as order_create_time,

            ord.buyer_id,

            byr.age,

            byr.provice,

            byr.star_level

         from test_order_stream as ord

         left join rds_dim_buyer for system_time s of proctime() as byr

         --实际项目中,可能为了避免join热点,对买家维度表做了md5处理,那么join的时候也要做对应处理,

         --如,新的join条件可能会变为:

         --on concat(substr(md5(ord.buyer_id), 1, 4),  '_', ord.order_id) = byr.md5_byr_id

         on ord.buyer_id = byr.buyer_id

         where ord.category_name = '手机'

            and ord.sub_category_name='iPhone';

         ---定义rds的结果表

         create table rds_mobile_orders(

          order_create_day  varchar,

          province  varchar,

          iphone_order_count  int,

          primary key(order_create_day,province)

          ) with (

            type = 'rds',

            url = 'your_mysql_url',

            tableName = ' your_table_name',

            userName = 'your_user_name',

            password = 'your_password'

          );

         ---安装天、省份汇总每天iphone手机销量

          inert into rds_mobile_orders

          select 

            substring(order_create_time, 1, 10) as order_create_day

            province,

            count(distinct order_id) as iphone_order_count

          from tem_order

          group by substring(order_create_time, 1, 10) ,province;

        4.2、窗口 操作

          group by操作的是全局窗口,阿里云Stream SQL还支持包含滑动、滚动、session等的窗口操作,下面以event time的滑动窗口为例介绍窗口操作。

          针对event time操作必须首先定义watermark,直接在订单源头流定义即可,hop(datetime, slide, size)函数定义滑动窗口,其中datetime为时间列,slide为滑动间隔,size为窗口大小,

          HOP_START则获取到窗口的开始时间,对上述的group by操作进行改动的实例如下,其业务含义为为每一小时统计过去24小时每个省份的的iphone手机销量。

          

          ---从源头接收订单实时流

          create table test_order_stream (

            gmt_create  varchar,

            gmt_modified  varchar,

            order_id  bigint,

            buyer_id  bigint,

            seller_id  bigint,

            item_id  bigint,

            json_object  varchar,

            order_type  varchar,

            category_name  varchar,

            sub_category_name  varchar,

            WATERMARK mywatermark FOR gmt_modified as withOffset(gmt_modified,1000)

          ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = ' your_project',

            topic = 'test_topic_1',

            accessId = 'your_access_id',

            accessKey = 'your_access_key',

            startTime = '2018-08-08-00:00:00'

            );

          

          ----定义rds买家维度表

         create table  rds_dim_buyer(

          buyer_id  int,

          age  int,

          province  varchar,

          star_level  varchar,

          primary key(buyer_id),

          period for system_time    ---定义了维度表的变化周期,即是一张变化的表

         ) with (

          type = 'rds',

          url = 'your_mySQL_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password'

          );

        

        ---订单流关联买家维度表获取买家所在省份,并过滤非iPhone订单

         create view tmp_order as

         select ord.order_id,

            ord.gmt_create as order_create_time,

            ord.buyer_id,

            byr.age,

            byr.provice,

            byr.star_level

         from test_order_stream as ord

         left join rds_dim_buyer for system_time s of proctime() as byr

         --实际项目中,可能为了避免join热点,对买家维度表做了md5处理,那么join的时候也要做对应处理,

         --如,新的join条件可能会变为:

         --on concat(substr(md5(ord.buyer_id), 1, 4),  '_', ord.order_id) = byr.md5_byr_id

         on ord.buyer_id = byr.buyer_id

         where ord.category_name = '手机'

            and ord.sub_category_name='iPhone';

        

        ---定义rds的结果表

         create table rds_mobile_orders(

          stat_begin_time  varchar,

          province  varchar,

          iphone_order_count  int,

          primary key(stat_begin_time,province)

          ) with (

            type = 'rds',

            url = 'your_mysql_url',

            tableName = ' your_table_name',

            userName = 'your_user_name',

            password = 'your_password'

          );

        ---每一小时统计过去24小时每个省份iPhone手机销量

        insert into rds_moble_orders

        select 

          cast(HOP_START(order_modified_time interval '1'  hour,  interva  '1' day)) as  TIMESTAMP) as stat_begin_time,

          province,  

          count(distinct order_id) as ihpone_order_count

        from tmp_order

        group by HOP_START(order_modified_time interval '1'  hour,  interva  '1' day),province;

      5、撤回机制

        在某些业务场景下,必须考虑撤回,否则计算结果不准确,比如用户排队咨询的场景,如果某用户A从队列1转移到队列2,现在要统计每个队列最终承担的用户咨询量,那么

        不考虑撤回将会导致重复计算。

        阿里云Stream  SQL支持撤回的处理,具体实例如下,其业务含义为统计每个队列最终承担的用户咨询量。

        ---从源头接收咨询session粒度的实时流

        create table test_queue_stream(

          gmt_create   varchar,

          gmt_modified  varchar,

          session_id  bigint,

          queue_id  bigint,

          session_user_id  bigint,

          session_user_name  bigint

        ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = ' your_project',

            topic = 'test_topic_1',

            accessId = 'your_access_id',

            accessKey = 'your_access_key',

            startTime = '2018-08-08-00:00:00'

          );

        ----创建临时表,取每个session的最后一个queue_id,与下面的group by操作一起支持撤回

        create view tmp_queue_stream as

        select 

          session_id,

          StringLast(queue_id)

        from test_queue_stream

        group by session_id;

        -----定义rds的结果表

        create table rds_queue_result(

          queue_id  varchar,

          session_count  int,

          primary key(queue_id)

        ) with (

            type = 'rds',

            url = 'your_mysql_url',

            tableName = ' your_table_name',

            userName = 'your_user_name',

            password = 'your_password'

          )

        ---统计每个队列的排队量,如果用户有队列变更,group by时会撤回,不会重复统计

         insert into rds_queue_result

         select queue_id,

            count(distinct session_id) as session_count

         from tmp_queue_stream

         group by queue_id;

      参考资料:《离线和实时大数据开发实战》

  • 相关阅读:
    zabbix邮件报警
    简单的带权随机算法
    一、向量
    C#遍历DataSet
    旅游(二)——广州
    旅游(一)——潮州
    LoRa术语
    Linux基础(一)
    Git(二)_基本命令
    Git使用(一)——Cygwin
  • 原文地址:https://www.cnblogs.com/shaosks/p/9596674.html
Copyright © 2011-2022 走看看