zoukankan      html  css  js  c++  java
  • 大数据开发实战:Stream SQL实时开发二

       1、介绍

          本节主要利用Stream SQL进行实时开发实战,回顾Beam的API和Hadoop MapReduce的API,会发现Google将实际业务对数据的各种操作进行了抽象,多变的数据需求抽象为三类:

        离线的Map、Shuffle、Reduce以及

          实时的ParDo、GroupByKey、Combine,这些抽象其实也对应了SQL的操作。SQL开发有如下几类:

        select操作:包括过滤、投影、表达式等。

        join操作:关联操作,包括和维度表关联以及窗口操作等。

        聚合操作:全局group by语句以及窗口操作等。

        以及上面三类的组合。

      2、select操作

          select操作是实时开发的基础,也是后续join操作和聚合操作的基础。

          另外,select操作也经常在实时开发中用于简单的数据map操作,即对某个数据源头做过滤,对源头字段执行各种转换(Json解析、类型转换、特征处理、大字段解析等),并将中间

        结果写到结果表中。

          如果select操作(如过滤、各种转换等)比较复杂,可以通过建立一个临时表(及view)暂存中间结果,这样既便于逻辑处理,也为代码可读性已经后续维护带来便捷。

          下面是一个select操作的实例,其中包括源头过滤、JSON解析、类型转换、特征处理等典型操作,为了处理便捷,中间试验了临时表,最后的结果写入RDS表供下游用户使用。

          ---从源头接收订单实时流

          create table test_order_stream (

            gmt_create varchar,

            gmt_modifed varchar,

            order_id  bigint,

            buyer_id bigint,  

            selller_id bigint,

            item_id bigint,

            json_object varchar,

            order_type varchar,

            category_name varchar,

            sub_categroy_name varchar

          ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = 'your_project',  

            topic = 'test_topic_1',

            accessId = 'your_accessId',

            accessKey = 'your_acccessKey',

            startTime = '2018-08-08 00:00:00'

            );

          ---创建一个临时表,完成各种过滤、字段重命名、类型转换、json解析、特征处理等。

          create view temp_order as

            select order_id,

            gmt_create as order_create_time,   

            buyer_id,

            seller_id,

            item_id,

            cast(order_type as bigint) as order_type,

            JSON_VALUE(json_object, '$.mobileType') as mobile_type,

            category_name,

            if (sub_category_name='iphone', 1, 0) as is_phone

          from test_order_stream

          where category_name='手机';

        --- 定义rds结果表

        create table rds_mobile_orders (

          order_id int,

          order_create_time varchar,

          buyer_id int,

          seller_id int,

          item_id int,

          order_type int,

          mobile_type varchar,

          category_name varchar,

          is_iphone int,

          primary key (order_id)

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password'

        );

        

        ---将手机订单明细写入rds结果表,供下游用户使用

        insert into rds_mobile_orders

        select 

          order_id,

          order_create_time,

          buyer_id,

          selller_id,

          item_id,

          ordery_type,

          mobile_type,

          category_name,

          is_iphone

        from tmp_order;

      

      3、join操作

        3.1、join维度表操作

          实际的业务开发中,最为经常的场景是通过关联相关的维度表扩展源头数据,以便于各种分析和统计。

          举个实例就是,比如比如上例中源头订单流仅包含了buyer_id, 分析买家数据后发现,仅有其id显然远远不够,实际业务场景肯定还需要地域、年龄、星级、注册时间等各种业务属性,

        才有实际的分析意义,这就是join维度表操作的含义。

          需要注意的是join维度表的触发,维度表在实际中也会被实时更新,但是如果将一个Stream SQL 表声明为维度表,那么此维度表的更新不会触发数据流的下发,比如join 上例中的

        order 流和买家维度表,那么只会order 流中数据关联买家维度表,然后order流带着这些关联的买家属性继续下流,但是买家的更新不会触发任何的数据下发。

          join 维度表的例子如下, 下面实例将join买家维度表以获取买家的所在省份、年龄、星级并最终将这些数据写入rds结果表中。

          

        ---从源头接收订单实时流

          create table test_order_stream (

            gmt_create varchar,

            gmt_modifed varchar,

            order_id  bigint,

            buyer_id bigint,  

            selller_id bigint,

            item_id bigint,

            json_object varchar,

            order_type varchar,

            category_name varchar,

            sub_categroy_name varchar

          ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = 'your_project',  

            topic = 'test_topic_1',

            accessId = 'your_accessId',

            accessKey = 'your_acccessKey',

            startTime = '2018-08-08 00:00:00'

            );

         

        --- 定义rds买家维度表

        create table rds_dim_buyer (

          buyer_id int,

          age int,

          province varchar,

          star_level varchar,

          primary key (buyer_id)

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password',

          PERIOD FOR SYSTEM_TIME  ---定义了维度表的变化周期,即表明该表是一张会变化的表

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password'

        );

        

         ---创建一个临时表关联买家维度表并过滤非手机订单。

          create view temp_order as

            select  ord.order_id,

            ord.gmt_create as order_create_time,   

            ord.buyer_id,

            ord.age,

            ord.province,

            ord.star_level

          from test_order_stream as ord

          left join rds_dim_buyer for system_time as of proctime() as byr

          on ord.buyer_id = byr.buyer_id

          where ord.category_name = '手机';

         

        --- 定义rds结果表

        create table rds_mobile_orders(

          order_id int,

          order_create_time varchar,

          buyer_id int,

          age int,

          province varchar,

          star_level varchar,

          primary key (order_id )

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password',

          PERIOD FOR SYSTEM_TIME  ---定义了维度表的变化周期,即表明该表是一张会变化的表

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password'

        );

        ----将手机订单以及关联的买家属性写入rds结果表

        insert into rds_mobile_orders

        select order_id,

           order_create_time,

           buyer_id,

           age,

           province,

           star_level

        from tmp_order;

        3.2、双流join操作

          不同于join维度表,双流join的含义是两个流做实时join,其中任何一个流的数据流入都会触发数据的下发。

          下面的例子和上面的join维度表类似,但是不同之处在于买家表不是一个维度表,而是一个datahub源头数据流,所以order流和买家流的任何一个流的更新都会触发数据下发。

          还需要注意的是,双流 join无限流的join,彼此会关联对方截止目前的所有数据,所以这一操作可能会导致大量数据堆积并影响性能,实际业务中请评估场景谨慎使用。

          

          ---从源头接收订单实时流

          create table test_order_stream (

            gmt_create varchar,

            gmt_modifed varchar,

            order_id  bigint,

            buyer_id bigint,  

            selller_id bigint,

            item_id bigint,

            json_object varchar,

            order_type varchar,

            category_name varchar,

            sub_categroy_name varchar

          ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = 'your_project',  

            topic = 'test_topic_1',

            accessId = 'your_accessId',

            accessKey = 'your_acccessKey',

            startTime = '2018-08-08 00:00:00'

            );

          

        --- 从源头接收买家实时流

        create table test_buyer_Stream (

          buyer_id int,

          age int,

          province varchar,

          star_level varchar,

          primary key (buyer_id)

        ) with (

            type = 'datahub',

            endpoint = 'http://dh-et2.aliyun-inc.com',

            project = 'your_project',  

            topic = 'test_topic_1',

            accessId = 'your_accessId',

            accessKey = 'your_acccessKey',

            startTime = '2018-08-08 00:00:00'

         );

        

        --创建一个临时双流join订单流和买家流并过滤非手机订单。

          create view temp_order as

            select  ord.order_id,

            ord.gmt_create as order_create_time,   

            ord.buyer_id,

            ord.age,

            ord.province,

            ord.star_level

          from test_order_stream as ord

          left join test_buyer_stream as byr

          on ord.buyer_id = byr.buyer_id

          where order.category_name = '手机';

        

        --- 定义rds结果表

        create table rds_mobile_orders(

          order_id int,

          order_create_time varchar,

          buyer_id int,

          age int,

          province varchar,

          star_level varchar,

          primary key (order_id )

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password',

          PERIOD FOR SYSTEM_TIME  ---定义了维度表的变化周期,即表明该表是一张会变化的表

        ) with (

          type = 'rds',

          url = 'your_mySql_url',

          tableName = 'your_table_name',

          userName = 'your_user_name',

          password = 'your_password'

        );

         

        ----将手机订单以及关联的买家属性写入rds结果表

        insert into rds_mobile_orders

        select order_id,

           order_create_time,

           buyer_id,

           age,

           province,

           star_level

        from tmp_order;

        参考资料:《离线和实时大数据开发实战》

  • 相关阅读:
    log4j1修改DailyRollingFileAppender支持日志最大数量
    log4j1 修改FileAppender解决当天的文件没有日期后缀
    log4j生成有日期的日志文件名
    Java删除List和Set集合中元素
    Java并发编程:并发容器之ConcurrentHashMap
    Java并发编程:并发容器之CopyOnWriteArrayList
    java.util.ConcurrentModificationException解决详解
    Handshake failed due to invalid Upgrade header: null 解决方案
    web项目Log4j日志输出路径配置问题
    log4j.properties 的使用详解
  • 原文地址:https://www.cnblogs.com/shaosks/p/9593018.html
Copyright © 2011-2022 走看看