zoukankan      html  css  js  c++  java
  • Sqoop+mysql+Hive+ Ozzie数据仓库案例

    mysql 数据库脚本为:

    /*==============================================================*/
    /* DBMS name:      MySQL 5.0                                    */
    /* Created on:     2018/11/23 1:09:10                           */
    /*==============================================================*/
    DROP DATABASE IF EXISTS mysql_sales_source;
    CREATE DATABASE IF NOT EXISTS mysql_sales_source DEFAULT CHARSET utf8 COLLATE utf8_general_ci; 
    
    USE mysql_sales_source;
    
    DROP TABLE IF EXISTS customer;
    
    DROP TABLE IF EXISTS product;
    
    DROP TABLE IF EXISTS sales_order;
    
    /*==============================================================*/
    /* Table: customer                                              */
    /*==============================================================*/
    CREATE TABLE customer
    (
       customer_number      INT(11) NOT NULL AUTO_INCREMENT,
       customer_name        VARCHAR(128) NOT NULL,
       customer_street_address VARCHAR(256) NOT NULL,
       customer_zip_code    INT(11) NOT NULL,
       customer_city        VARCHAR(32) NOT NULL,
       customer_state       VARCHAR(32) NOT NULL,
       PRIMARY KEY (customer_number)
    );
    
    /*==============================================================*/
    /* Table: product                                               */
    /*==============================================================*/
    CREATE TABLE product
    (
       product_code         INT(11) NOT NULL AUTO_INCREMENT,
       product_name         VARCHAR(128) NOT NULL,
       product_category     VARCHAR(256) NOT NULL,
       PRIMARY KEY (product_code)
    );
    
    /*==============================================================*/
    /* Table: sales_order                                           */
    /*==============================================================*/
    CREATE TABLE sales_order
    (
       order_number         INT(11) NOT NULL AUTO_INCREMENT,
       customer_number      INT(11) NOT NULL,
       product_code         INT(11) NOT NULL,
       order_date           DATETIME NOT NULL,
       entry_date           DATETIME NOT NULL,
       order_amount         DECIMAL(18,2) NOT NULL,
       PRIMARY KEY (order_number)
    );
    
    /*==============================================================*/
    /* insert data                                        */
    /*==============================================================*/
    
    INSERT INTO customer
    ( customer_name
    , customer_street_address
    , customer_zip_code
    , customer_city
    , customer_state
     )
    VALUES
      ('Big Customers', '7500 Louise Dr.', '17050','Mechanicsburg', 'PA')
    , ( 'Small Stores', '2500 Woodland St.', '17055', 'Pittsburgh', 'PA')
    , ('Medium Retailers', '1111 Ritter Rd.', '17055','Pittsburgh', 'PA')
    ,  ('Good Companies', '9500 Scott St.', '17050','Mechanicsburg', 'PA')
    , ('Wonderful Shops', '3333 Rossmoyne Rd.', '17050','Mechanicsburg', 'PA')
    , ('Loyal Clients', '7070 Ritter Rd.', '17055','Pittsburgh', 'PA');
           
           
    INSERT INTO product(product_name,product_category) VALUES
    ('Hard Disk','Storage'),
    ('Floppy Drive','Storage'),
    ('lcd panel','monitor');
    
    
    
    DROP PROCEDURE  IF EXISTS usp_generate_order_data;
    DELIMITER //
    CREATE PROCEDURE usp_generate_order_data()
    BEGIN
    
        DROP TABLE IF EXISTS tmp_sales_order;
        CREATE TABLE tmp_sales_order AS SELECT * FROM sales_order WHERE 1=0;
        SET @start_date := UNIX_TIMESTAMP('2018-1-1');
        SET @end_date := UNIX_TIMESTAMP('2018-11-23');
        SET @i := 1;
        WHILE @i<=10000 DO
            SET @customer_number := FLOOR(1+RAND()*6);
            SET @product_code := FLOOR(1+RAND()* 3);
            SET @order_date := FROM_UNIXTIME(@start_date+RAND()*(@end_date-@start_date));
            SET @amount := FLOOR(1000+RAND()*9000);
            INSERT INTO tmp_sales_order VALUES (@i,@customer_number,@product_code,@order_date,@order_date,@amount);
            SET @i := @i +1;
        END WHILE;
        TRUNCATE TABLE sales_order;
        INSERT INTO sales_order
        SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount
        FROM tmp_sales_order;
        COMMIT;
        DROP TABLE IF EXISTS tmp_sales_order;
    END;
    //
    DELIMITER ;
    CALL usp_generate_order_data();
    View Code

    ods脚本为:

    create database sales_ods
    /*==============================================================*/
    /* DBMS name:      Hive                                         */
    /* Created on:     2018/11/23 1:09:10                           */
    /*==============================================================*/
    
    CREATE DATABASE IF NOT EXISTS sales_ods DEFAULT CHARSET utf8 COLLATE utf8_general_ci; 
    
    USE sales_ods;
    
    DROP TABLE IF EXISTS rds.customer;
    
    DROP TABLE IF EXISTS rds.product;
    
    DROP TABLE IF EXISTS rds.sales_order;
    
    /*==============================================================*/
    /* Table: customer                                              */
    /*==============================================================*/
    CREATE TABLE sales_rds.customer
    (
       customer_number      INT ,
       customer_name        VARCHAR(128)  ,
       customer_street_address VARCHAR(256)  ,
       customer_zip_code    INT  ,
       customer_city        VARCHAR(32)  ,
       customer_state       VARCHAR(32)  
    );
    
    /*==============================================================*/
    /* Table: product                                               */
    /*==============================================================*/
    CREATE TABLE sales_rds.product
    (
       product_code         INT,
       product_name         VARCHAR(128)  ,
       product_category     VARCHAR(256)  
    );
    
    /*==============================================================*/
    /* Table: sales_order                                           */
    /*==============================================================*/
    CREATE TABLE sales_rds.sales_order
    (
       order_number         INT ,
       customer_number      INT,
       product_code         INT ,
       order_date           timestamp  ,
       entry_date           timestamp  ,
       order_amount         DECIMAL(18,2)  
    );
    View Code

    DW脚本为:

    create database dw;
    create table dim_product
    (
       product_sk           int   ,
       product_code         int ,
       product_name         varchar(128),
       product_category     varchar(256),
       version              varchar(32),
       effective_date       date,
       expiry_date          date
    )
    clustered by (product_sk ) into 8 buckets
    stored as orc tblproperties('transactional'='true');
    
    
    /*==============================================================*/
    /* Table: dim_customer                                          */
    /*==============================================================*/
    create table dim_customer
    (
       customer_sk          int   ,
       customer_number      int ,
       customer_name        varchar(128),
       customer_street_address varchar(256),
       customer_zip_code    int,
       customer_city        varchar(32),
       customer_state       varchar(32),
       version              varchar(32),
       effective_date       date,
       expiry_date          date
    )
    clustered by (customer_sk ) into 8 buckets
    stored as orc tblproperties('transactional'='true');
    
    /*==============================================================*/
    /* Table: dim_date                                              */
    /*==============================================================*/
    create table dw.dim_date
    (
       date_sk              int   ,
       date                 date,
       month                tinyint,
       month_name            varchar(16),
       quarter              tinyint,
       year                 int
    ) row format delimited fields terminated by ','
    stored as textfile;
    
    /*==============================================================*/
    /* Table: dim_order                                             */
    /*==============================================================*/
    create table dim_order
    (
       order_sk             int  ,
       order_number         int,
       version              varchar(32),
       effective_date       date,
       expiry_date          date
    )
    clustered by (order_sk ) into 8 buckets
    stored as orc tblproperties('transactional'='true');
    ;
    
    /*==============================================================*/
    /* Table: fact_sales_order                                      */
    /*==============================================================*/
    create table fact_sales_order
    (
       order_sk             int  ,
       customer_sk          int  ,
       product_sk           int  ,
       order_date_sk        int  ,
       order_amount         decimal(18,2)
    )
    partitioned by(order_date string)
    clustered by (order_sk ) into 8 buckets
    stored as orc tblproperties('transactional'='true');
    ;
    View Code

    生成dim_date数据:

    #generate_dim_date.sh
    
    #!/bin/bash
    date1="$1"
    date2="$2"
    tempdate=`date -d "$date1" +%F`
    tempdateSec=`date -d "$date2" +%s`
    enddateSec=`date -d "$date2" +$s`
    min=1
    #max=`expr ( $enddateSec - $tempdateSec ) /  ( 24 * 60 * 60 ) + 1`
    max=14611
    cat /datas >./dim_date.csv
    
    while [ $min -le $max ] 
    do
        month=`date -d "$tempdate" +%m`
        month_name=`date -d "$tempdate" +%B`
        quarter=`echo $month | awk '{print int(($0-1)/3 +1 }'`
        year=`date -d "$tempdate" +%Y`
        echo ${min}","${tempdate}","${month}","${month_name}","${quarter}","${year} >> ./dim_date.csv
        tempdate=`date -d "+$min day $date1" +%F`
        tempdateSec=`date -d "+min day $date1" +%s`
        min=`expr $min + 1`
    done
    View Code

    init_dw_etl.sql hive脚本:

    USE dw;
    
    -- 清空表
    TRUNCATE TABLE dim_customer;
    TRUNCATE TABLE dim_product;
    TRUNCATE TABLE dim_order;
    TRUNCATE TABLE fact_sales_order;
    
    -- 装载客户维度表
    INSERT INTO customer_dim (customer_sk,customer_number,customer_name,customer_street_address,customer_zip_code,customer_city,customer_state,`version`,effective_date,expiry_date)
    SELECT
        row_number() over (ORDER BY t1.customer_number) + t2.sk_max,
        t1.customer_number, 
        t1.customer_name, 
        t1.customer_street_address,
        t1.customer_zip_code, 
        t1.customer_city, 
        t1.customer_state, 
        1,
        '2016-03-01', 
        '2050-01-01'
    FROM ods.customer t1
    CROSS JOIN 
        (SELECT COALESCE(MAX(customer_sk),0) sk_max 
        FROM dim_customer) t2;
        
    -- 装载产品维度表
    INSERT INTO dim_product (product_sk,product_code,product_name,product_category,`version`,effective_date,expiry_date)
    SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max,
        product_code, 
        product_name, 
        product_category, 
        1,
        '2016-03-01', 
        '2050-01-01'
    FROM ods.product t1
    CROSS JOIN
        (SELECT COALESCE(MAX(product_sk),0) sk_max 
        FROM product_dim) t2;
        
    -- 装载订单维度表
    INSERT INTO dim_order(order_sk,order_number,`version`,effective_date,expiry_date)
    SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max,
        order_number, 
        1, 
        order_date, 
        '2050-01-01'
    FROM ods.sales_order t1
    CROSS JOIN
        (SELECT COALESCE(MAX(order_sk),0) sk_max 
        FROM dim_order) t2;
        
    -- 装载销售订单事实表
    INSERT INTO fact_sales_order()
    SELECT order_sk, 
        customer_sk, 
        product_sk, 
        date_sk, 
        order_amount
    FROM ods.sales_order a
    JOIN dim_order b ON a.order_number = b.order_number
    JOIN dim_customer c ON a.customer_number = c.customer_number
    JOIN dim_product d ON a.product_code = d.product_code
    JOIN dim_date e ON (a.order_date) = e.date
    View Code

    init_all_etl.sh脚本:

    #!/bin/bash
    # 建立Sqoop增量导入作业,以order_number作为检查列,初始的last-value是0
    sqoop job --delete rds_incremental_import_job
    sqoop job --create rds_incremental_import_job 
    -- 
    import 
    --connect "jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false&user=root&password=123456" 
    --table sales_order 
    --columns "order_number, customer_number, product_code, order_date, entry_date, order_amount"
     --hive-import 
    --hive-table rds.sales_order 
    --incremental append 
    --check-column order_number 
    --last-value 0
    # 首次抽取,将全部数据导入RDS库
    sqoop import --connect jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false --username root --password 123456 --table customer --hive-import --hive-table rds.customer --hive-overwrite
    sqoop import --connect jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false --username root --password 123456 --table product --hive-import --hive-table rds.product --hive-overwrite
    beeline -u jdbc:hive2://cdh2:10000/dw -e "TRUNCATE TABLE rds.sales_order"
    # 执行增量导入,因为last-value初始值为0,所以此次会导入全部数据
    sqoop job --exec rds_incremental_import_job
    # 调用init_etl.sql文件执行初始装载
    beeline -u jdbc:hive2://cdh2:10000/dw -f init_dw_etl.sql
    View Code

    load_source_dim_date.sql脚本:

    DELIMITER //
    CREATE PROCEDURE USP_Load_Dim_Date(dt_start DATE,dt_end DATE)
    BEGIN
    WHILE dt_start<=dt_end DO
        INSERT INTO dim_date (`date`,`month`,`month_name`,`quarter`,`year`)
        VALUES (dt_start,MONTH(dt_start),MONTHNAME(dt_start),QUARTER(dt_start),YEAR(dt_start));
        SET dt_start =ADDDATE(dt_start,1);
    END WHILE;
    COMMIT;
    END;
     //
    
    CALL USP_Load_Dim_Date('2010-1-1','2050-1-1')
    
    SELECT * FROM dim_date
    View Code

    schedule_daily_etl.sql 每日周期调度sql脚本:

    -- 设置scd的生效时间和过期时间
    SET hivevar:cur_date = CURRENT_DATE(); 
    SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
    SET hivevar:max_date = CAST('2050-01-01' AS DATE);
    
    -- 设置cdc的开始结束日期
    INSERT overwrite TABLE rds.cdc_time
    SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;
    
    -- 装载customer维度
    -- 获取源数据中被删除的客户和地址发生改变的客户,将这些数据设置为过期时间,即当前时间的前一天
    UPDATE dim_customer
    SET expiry_date = ${hivevar:pre_date}
    WHERE dim_customer.customer_sk IN(SELECT
                                        a.customer_sk
                                      FROM (SELECT
                                              customer_sk,
                                              customer_number,
                                              customer_street_address
                                            FROM dim_customer
                                            WHERE expiry_date = ${hivevar:max_date}) a
                                      LEFT JOIN rds.customer b ON a.customer_number = b.customer_number
                                      WHERE b.customer_number IS NULL
                                           OR a.customer_street_address <> b.customer_street_address);
    
    -- 将有地址变化的插入到dim_customer表,如果有相同数据存在有不过期的数据则不插入
    INSERT INTO dim_customer
    SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max,
        t1.customer_number,
        t1.customer_name,
        t1.customer_street_address,
        t1.customer_zip_code,
        t1.customer_city,
        t1.customer_state,
        t1.version,
        t1.effective_date,
        t1.expiry_date
    FROM(SELECT
        t2.customer_number customer_number,
        t2.customer_name customer_name,
        t2.customer_street_address customer_street_address,
        t2.customer_zip_code,
        t2.customer_city,
        t2.customer_state,
        t1.version + 1 `version`,
        ${hivevar:pre_date} effective_date,
        ${hivevar:max_date} expiry_date
    FROM dim_customer t1
    INNER JOIN rds.customer t2 ON t1.customer_number = t2.customer_number
                    AND t1.expiry_date = ${hivevar:pre_date}
    LEFT JOIN dim_customer t3 ON t1.customer_number = t3.customer_number
                AND t3.expiry_date = ${hivevar:max_date}
    WHERE t1.customer_street_address <> t2.customer_street_address 
        AND t3.customer_sk IS NULL
    ) t1
    CROSS JOIN(SELECT 
            COALESCE(MAX(customer_sk),0) sk_max 
           FROM dim_customer) t2;
          
    
    -- 处理customer_name列上的scd1,覆盖
    -- 不进行更新,将源数据中的name列有变化的数据提取出来,放入临时表
    -- 将 dim_couster中这些数据删除、
    -- 将临时表中的数据插入
    DROP TABLE IF EXISTS tmp;
    CREATE TABLE tmp AS
    SELECT a.customer_sk,
        a.customer_number,
        b.customer_name,
        a.customer_street_address,
        a.customer_zip_code,
        a.customer_city,
        a.customer_state,
        a.version,
        a.effective_date,
        a.expiry_date
    FROM dim_customer a 
    JOIN rds.customer b ON a.customer_number = b.customer_number 
                AND(a.customer_name <> b.customer_name);
    -- 删除数据            
    DELETE FROM
    dim_customer WHERE
    dim_customer.customer_sk IN (SELECT customer_sk FROM tmp);
    
    -- 插入数据
    INSERT INTO dim_customer 
    SELECT * FROM tmp;
    
    
    
    -- 处理新增的customer记录
    INSERT INTO dim_customer
    SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max,
        t1.customer_number,
        t1.customer_name,
        t1.customer_street_address,
        t1.customer_zip_code,
        t1.customer_city,
        t1.customer_state,
        1,
        ${hivevar:pre_date},
        ${hivevar:max_date}
    FROM( SELECT t1.* 
        FROM rds.customer t1 
        LEFT JOIN dim_customer t2 ON t1.customer_number = t2.customer_number
    WHERE t2.customer_sk IS NULL ) t1
    CROSS JOIN(SELECT 
            COALESCE(MAX(customer_sk),0) sk_max 
           FROM dim_customer) t2;
    
    
    
    
    -- 装载product维度
    -- 取源数据中删除或者属性发生变化的产品,将对应
    UPDATE dim_product
    SET expiry_date = ${hivevar:pre_date}
    WHERE dim_product.product_sk IN(SELECT a.product_sk
                    FROM(SELECT product_sk,
                            product_code,
                            product_name,
                            product_category
                         FROM dim_product 
                         WHERE expiry_date = ${hivevar:max_date}) a 
                         LEFT JOIN rds.product b ON a.product_code = b.product_code
                         WHERE b.product_code IS NULL 
                            OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
                        
    -- 处理product_name、product_category列上scd2的新增行
    INSERT INTO dim_product
    SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max,
        t1.product_code,
        t1.product_name,
        t1.product_category,
        t1.version,
        t1.effective_date,
        t1.expiry_date
    FROM( SELECT t2.product_code product_code,
            t2.product_name product_name,
            t2.product_category product_category,
            t1.version + 1 `version`,
            ${hivevar:pre_date} effective_date,
            ${hivevar:max_date} expiry_date
    FROM dim_product t1
    INNER JOIN rds.product t2 ON t1.product_code = t2.product_code
                    AND t1.expiry_date = ${hivevar:pre_date}
    LEFT JOIN dim_product t3 ON t1.product_code = t3.product_code 
                    AND t3.expiry_date = ${hivevar:max_date}
    WHERE(t1.product_name <> t2.product_name 
        OR t1.product_category <> t2.product_category) 
        AND t3.product_sk IS NULL
    ) t1
    CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max 
            FROM dim_product) t2;
            
    -- 处理新增的 product 记录
    INSERT INTO dim_product
    SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max,
        t1.product_code,
        t1.product_name,
        t1.product_category,
        1,
        ${hivevar:pre_date},
        ${hivevar:max_date}
    FROM( SELECT t1.* 
        FROM rds.product t1 
        LEFT JOIN dim_product t2 ON t1.product_code = t2.product_code
        WHERE t2.product_sk IS NULL
        ) t1
    CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max 
            FROM dim_product) t2;
    
    
    
    -- 装载order维度
    INSERT INTO dim_order
    SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max,
        t1.order_number,
        t1.version,
        t1.effective_date,
        t1.expiry_date
    FROM(  SELECT order_number order_number,
            1 `version`,
            order_date effective_date,
            '2050-01-01' expiry_date
        FROM rds.sales_order, rds.cdc_time
        WHERE entry_date >= last_load AND entry_date < current_load ) t1
        CROSS JOIN(    SELECT COALESCE(MAX(order_sk),0) sk_max 
                FROM dim_order) t2;
    
    
    -- 装载销售订单事实表
    INSERT INTO sales_fact_sales_order
    SELECT order_sk,
        customer_sk,
        product_sk,
        date_sk,
        order_amount
    FROM rds.sales_order a,
        dim_order b,
        dim_customer c,
        dim_product d,
        date_dim e,
        rds.cdc_time f
    WHERE a.order_number = b.order_number
        AND a.customer_number = c.customer_number
        AND a.order_date >= c.effective_date
        AND a.order_date < c.expiry_date
        AND a.product_code = d.product_code
        AND a.order_date >= d.effective_date
        AND a.order_date < d.expiry_date
        AND to_date(a.order_date) = e.date
        AND a.entry_date >= f.last_load 
        AND a.entry_date < f.current_load ;
    
    
    
    -- 更新时间戳表的last_load字段
    INSERT overwrite TABLE rds.cdc_time 
    SELECT current_load, current_load 
    FROM rds.cdc_time;
    View Code

    schedule_daily.sh每日周期调度sh脚本:

    #!/bin/bash
    # 整体拉取customer、product表数据
    sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password
    mypassword --table customer --hive-import --hive-table rds.customer --hive-overwrite
    sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password
    mypassword --table product --hive-import --hive-table rds.product --hive-overwrite
    # 执行增量导入
    sqoop job --exec myjob_incremental_import
    # 调用 regular_etl.sql 文件执行定期装载
    beeline -u jdbc:hive2://cdh2:10000/dw -f schedule_daily_etl.sql
    View Code

    使用案例:

    -- 2015年各城市的手机销量
    USE test;
    SELECT SUM(Units_Sold),City
    FROM Fact_Sales a 
    JOIN Dim_Store b ON a.Store_Id = b.id
    JOIN Dim_Date c ON a.Date_Id = c.id
    JOIN Dim_Product d ON a.Product_Id = d.id
    WHERE c.Year=2018 AND d.Product_Category='mobile'
    GROUP BY City;
    USE snow;
    SELECT SUM(Units_Sold),City
    FROM Fact_Sales a
    JOIN Dim_Store b ON a.Store_Id = b.id
    JOIN Dim_Geography c ON  b.Geography_Id = c.id
    JOIN Dim_Product d ON a.Product_Id = d.Product_Id
    JOIN Dim_Category e ON d.Category_Id = e.Category_Id
    JOIN Dim_Date f ON a.Date_Id = f.id
    WHERE e.Categoryt_Name='mobile' AND f.Year = 2015
    GROUP BY City;
  • 相关阅读:
    typedef void (*funcptr)(void) typedef void (*PFV)(); typedef int32_t (*PFI)();
    STM32 STM32F4 寄存器怎么配置不上, 无法往寄存器写入数据
    GPIO
    JSP和selevt 生命周期详解(JSP的生命周期和select很像,jsp底层就是一个selevt)
    jquery自带的排序方法(js也是)
    GET和POST是HTTP请求的两种基本方法,区别是什么!?
    springboot特性
    restful风格接口类型和优点
    提升必看!!!
    分组函数 partition by 的详解,与order by 区别
  • 原文地址:https://www.cnblogs.com/lenmom/p/10210782.html
Copyright © 2011-2022 走看看