mysql 数据库脚本为:
/*==============================================================*/ /* DBMS name: MySQL 5.0 */ /* Created on: 2018/11/23 1:09:10 */ /*==============================================================*/ DROP DATABASE IF EXISTS mysql_sales_source; CREATE DATABASE IF NOT EXISTS mysql_sales_source DEFAULT CHARSET utf8 COLLATE utf8_general_ci; USE mysql_sales_source; DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS product; DROP TABLE IF EXISTS sales_order; /*==============================================================*/ /* Table: customer */ /*==============================================================*/ CREATE TABLE customer ( customer_number INT(11) NOT NULL AUTO_INCREMENT, customer_name VARCHAR(128) NOT NULL, customer_street_address VARCHAR(256) NOT NULL, customer_zip_code INT(11) NOT NULL, customer_city VARCHAR(32) NOT NULL, customer_state VARCHAR(32) NOT NULL, PRIMARY KEY (customer_number) ); /*==============================================================*/ /* Table: product */ /*==============================================================*/ CREATE TABLE product ( product_code INT(11) NOT NULL AUTO_INCREMENT, product_name VARCHAR(128) NOT NULL, product_category VARCHAR(256) NOT NULL, PRIMARY KEY (product_code) ); /*==============================================================*/ /* Table: sales_order */ /*==============================================================*/ CREATE TABLE sales_order ( order_number INT(11) NOT NULL AUTO_INCREMENT, customer_number INT(11) NOT NULL, product_code INT(11) NOT NULL, order_date DATETIME NOT NULL, entry_date DATETIME NOT NULL, order_amount DECIMAL(18,2) NOT NULL, PRIMARY KEY (order_number) ); /*==============================================================*/ /* insert data */ /*==============================================================*/ INSERT INTO customer ( customer_name , customer_street_address , customer_zip_code , customer_city , customer_state ) VALUES ('Big Customers', '7500 Louise Dr.', '17050','Mechanicsburg', 'PA') , ( 'Small Stores', '2500 Woodland St.', '17055', 'Pittsburgh', 'PA') , ('Medium Retailers', '1111 Ritter Rd.', '17055','Pittsburgh', 'PA') , ('Good Companies', '9500 Scott St.', '17050','Mechanicsburg', 'PA') , ('Wonderful Shops', '3333 Rossmoyne Rd.', '17050','Mechanicsburg', 'PA') , ('Loyal Clients', '7070 Ritter Rd.', '17055','Pittsburgh', 'PA'); INSERT INTO product(product_name,product_category) VALUES ('Hard Disk','Storage'), ('Floppy Drive','Storage'), ('lcd panel','monitor'); DROP PROCEDURE IF EXISTS usp_generate_order_data; DELIMITER // CREATE PROCEDURE usp_generate_order_data() BEGIN DROP TABLE IF EXISTS tmp_sales_order; CREATE TABLE tmp_sales_order AS SELECT * FROM sales_order WHERE 1=0; SET @start_date := UNIX_TIMESTAMP('2018-1-1'); SET @end_date := UNIX_TIMESTAMP('2018-11-23'); SET @i := 1; WHILE @i<=10000 DO SET @customer_number := FLOOR(1+RAND()*6); SET @product_code := FLOOR(1+RAND()* 3); SET @order_date := FROM_UNIXTIME(@start_date+RAND()*(@end_date-@start_date)); SET @amount := FLOOR(1000+RAND()*9000); INSERT INTO tmp_sales_order VALUES (@i,@customer_number,@product_code,@order_date,@order_date,@amount); SET @i := @i +1; END WHILE; TRUNCATE TABLE sales_order; INSERT INTO sales_order SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount FROM tmp_sales_order; COMMIT; DROP TABLE IF EXISTS tmp_sales_order; END; // DELIMITER ; CALL usp_generate_order_data();
ods脚本为:
create database sales_ods /*==============================================================*/ /* DBMS name: Hive */ /* Created on: 2018/11/23 1:09:10 */ /*==============================================================*/ CREATE DATABASE IF NOT EXISTS sales_ods DEFAULT CHARSET utf8 COLLATE utf8_general_ci; USE sales_ods; DROP TABLE IF EXISTS rds.customer; DROP TABLE IF EXISTS rds.product; DROP TABLE IF EXISTS rds.sales_order; /*==============================================================*/ /* Table: customer */ /*==============================================================*/ CREATE TABLE sales_rds.customer ( customer_number INT , customer_name VARCHAR(128) , customer_street_address VARCHAR(256) , customer_zip_code INT , customer_city VARCHAR(32) , customer_state VARCHAR(32) ); /*==============================================================*/ /* Table: product */ /*==============================================================*/ CREATE TABLE sales_rds.product ( product_code INT, product_name VARCHAR(128) , product_category VARCHAR(256) ); /*==============================================================*/ /* Table: sales_order */ /*==============================================================*/ CREATE TABLE sales_rds.sales_order ( order_number INT , customer_number INT, product_code INT , order_date timestamp , entry_date timestamp , order_amount DECIMAL(18,2) );
DW脚本为:
create database dw; create table dim_product ( product_sk int , product_code int , product_name varchar(128), product_category varchar(256), version varchar(32), effective_date date, expiry_date date ) clustered by (product_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); /*==============================================================*/ /* Table: dim_customer */ /*==============================================================*/ create table dim_customer ( customer_sk int , customer_number int , customer_name varchar(128), customer_street_address varchar(256), customer_zip_code int, customer_city varchar(32), customer_state varchar(32), version varchar(32), effective_date date, expiry_date date ) clustered by (customer_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); /*==============================================================*/ /* Table: dim_date */ /*==============================================================*/ create table dw.dim_date ( date_sk int , date date, month tinyint, month_name varchar(16), quarter tinyint, year int ) row format delimited fields terminated by ',' stored as textfile; /*==============================================================*/ /* Table: dim_order */ /*==============================================================*/ create table dim_order ( order_sk int , order_number int, version varchar(32), effective_date date, expiry_date date ) clustered by (order_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); ; /*==============================================================*/ /* Table: fact_sales_order */ /*==============================================================*/ create table fact_sales_order ( order_sk int , customer_sk int , product_sk int , order_date_sk int , order_amount decimal(18,2) ) partitioned by(order_date string) clustered by (order_sk ) into 8 buckets stored as orc tblproperties('transactional'='true'); ;
生成dim_date数据:
#generate_dim_date.sh #!/bin/bash date1="$1" date2="$2" tempdate=`date -d "$date1" +%F` tempdateSec=`date -d "$date2" +%s` enddateSec=`date -d "$date2" +$s` min=1 #max=`expr ( $enddateSec - $tempdateSec ) / ( 24 * 60 * 60 ) + 1` max=14611 cat /datas >./dim_date.csv while [ $min -le $max ] do month=`date -d "$tempdate" +%m` month_name=`date -d "$tempdate" +%B` quarter=`echo $month | awk '{print int(($0-1)/3 +1 }'` year=`date -d "$tempdate" +%Y` echo ${min}","${tempdate}","${month}","${month_name}","${quarter}","${year} >> ./dim_date.csv tempdate=`date -d "+$min day $date1" +%F` tempdateSec=`date -d "+min day $date1" +%s` min=`expr $min + 1` done
init_dw_etl.sql hive脚本:
USE dw; -- 清空表 TRUNCATE TABLE dim_customer; TRUNCATE TABLE dim_product; TRUNCATE TABLE dim_order; TRUNCATE TABLE fact_sales_order; -- 装载客户维度表 INSERT INTO customer_dim (customer_sk,customer_number,customer_name,customer_street_address,customer_zip_code,customer_city,customer_state,`version`,effective_date,expiry_date) SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max, t1.customer_number, t1.customer_name, t1.customer_street_address, t1.customer_zip_code, t1.customer_city, t1.customer_state, 1, '2016-03-01', '2050-01-01' FROM ods.customer t1 CROSS JOIN (SELECT COALESCE(MAX(customer_sk),0) sk_max FROM dim_customer) t2; -- 装载产品维度表 INSERT INTO dim_product (product_sk,product_code,product_name,product_category,`version`,effective_date,expiry_date) SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max, product_code, product_name, product_category, 1, '2016-03-01', '2050-01-01' FROM ods.product t1 CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2; -- 装载订单维度表 INSERT INTO dim_order(order_sk,order_number,`version`,effective_date,expiry_date) SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max, order_number, 1, order_date, '2050-01-01' FROM ods.sales_order t1 CROSS JOIN (SELECT COALESCE(MAX(order_sk),0) sk_max FROM dim_order) t2; -- 装载销售订单事实表 INSERT INTO fact_sales_order() SELECT order_sk, customer_sk, product_sk, date_sk, order_amount FROM ods.sales_order a JOIN dim_order b ON a.order_number = b.order_number JOIN dim_customer c ON a.customer_number = c.customer_number JOIN dim_product d ON a.product_code = d.product_code JOIN dim_date e ON (a.order_date) = e.date
init_all_etl.sh脚本:
#!/bin/bash # 建立Sqoop增量导入作业,以order_number作为检查列,初始的last-value是0 sqoop job --delete rds_incremental_import_job sqoop job --create rds_incremental_import_job -- import --connect "jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false&user=root&password=123456" --table sales_order --columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" --hive-import --hive-table rds.sales_order --incremental append --check-column order_number --last-value 0 # 首次抽取,将全部数据导入RDS库 sqoop import --connect jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false --username root --password 123456 --table customer --hive-import --hive-table rds.customer --hive-overwrite sqoop import --connect jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false --username root --password 123456 --table product --hive-import --hive-table rds.product --hive-overwrite beeline -u jdbc:hive2://cdh2:10000/dw -e "TRUNCATE TABLE rds.sales_order" # 执行增量导入,因为last-value初始值为0,所以此次会导入全部数据 sqoop job --exec rds_incremental_import_job # 调用init_etl.sql文件执行初始装载 beeline -u jdbc:hive2://cdh2:10000/dw -f init_dw_etl.sql
load_source_dim_date.sql脚本:
DELIMITER // CREATE PROCEDURE USP_Load_Dim_Date(dt_start DATE,dt_end DATE) BEGIN WHILE dt_start<=dt_end DO INSERT INTO dim_date (`date`,`month`,`month_name`,`quarter`,`year`) VALUES (dt_start,MONTH(dt_start),MONTHNAME(dt_start),QUARTER(dt_start),YEAR(dt_start)); SET dt_start =ADDDATE(dt_start,1); END WHILE; COMMIT; END; // CALL USP_Load_Dim_Date('2010-1-1','2050-1-1') SELECT * FROM dim_date
schedule_daily_etl.sql 每日周期调度sql脚本:
-- 设置scd的生效时间和过期时间 SET hivevar:cur_date = CURRENT_DATE(); SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1); SET hivevar:max_date = CAST('2050-01-01' AS DATE); -- 设置cdc的开始结束日期 INSERT overwrite TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time; -- 装载customer维度 -- 获取源数据中被删除的客户和地址发生改变的客户,将这些数据设置为过期时间,即当前时间的前一天 UPDATE dim_customer SET expiry_date = ${hivevar:pre_date} WHERE dim_customer.customer_sk IN(SELECT a.customer_sk FROM (SELECT customer_sk, customer_number, customer_street_address FROM dim_customer WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN rds.customer b ON a.customer_number = b.customer_number WHERE b.customer_number IS NULL OR a.customer_street_address <> b.customer_street_address); -- 将有地址变化的插入到dim_customer表,如果有相同数据存在有不过期的数据则不插入 INSERT INTO dim_customer SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max, t1.customer_number, t1.customer_name, t1.customer_street_address, t1.customer_zip_code, t1.customer_city, t1.customer_state, t1.version, t1.effective_date, t1.expiry_date FROM(SELECT t2.customer_number customer_number, t2.customer_name customer_name, t2.customer_street_address customer_street_address, t2.customer_zip_code, t2.customer_city, t2.customer_state, t1.version + 1 `version`, ${hivevar:pre_date} effective_date, ${hivevar:max_date} expiry_date FROM dim_customer t1 INNER JOIN rds.customer t2 ON t1.customer_number = t2.customer_number AND t1.expiry_date = ${hivevar:pre_date} LEFT JOIN dim_customer t3 ON t1.customer_number = t3.customer_number AND t3.expiry_date = ${hivevar:max_date} WHERE t1.customer_street_address <> t2.customer_street_address AND t3.customer_sk IS NULL ) t1 CROSS JOIN(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM dim_customer) t2; -- 处理customer_name列上的scd1,覆盖 -- 不进行更新,将源数据中的name列有变化的数据提取出来,放入临时表 -- 将 dim_couster中这些数据删除、 -- 将临时表中的数据插入 DROP TABLE IF EXISTS tmp; CREATE TABLE tmp AS SELECT a.customer_sk, a.customer_number, b.customer_name, a.customer_street_address, a.customer_zip_code, a.customer_city, a.customer_state, a.version, a.effective_date, a.expiry_date FROM dim_customer a JOIN rds.customer b ON a.customer_number = b.customer_number AND(a.customer_name <> b.customer_name); -- 删除数据 DELETE FROM dim_customer WHERE dim_customer.customer_sk IN (SELECT customer_sk FROM tmp); -- 插入数据 INSERT INTO dim_customer SELECT * FROM tmp; -- 处理新增的customer记录 INSERT INTO dim_customer SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max, t1.customer_number, t1.customer_name, t1.customer_street_address, t1.customer_zip_code, t1.customer_city, t1.customer_state, 1, ${hivevar:pre_date}, ${hivevar:max_date} FROM( SELECT t1.* FROM rds.customer t1 LEFT JOIN dim_customer t2 ON t1.customer_number = t2.customer_number WHERE t2.customer_sk IS NULL ) t1 CROSS JOIN(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM dim_customer) t2; -- 装载product维度 -- 取源数据中删除或者属性发生变化的产品,将对应 UPDATE dim_product SET expiry_date = ${hivevar:pre_date} WHERE dim_product.product_sk IN(SELECT a.product_sk FROM(SELECT product_sk, product_code, product_name, product_category FROM dim_product WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN rds.product b ON a.product_code = b.product_code WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category)); -- 处理product_name、product_category列上scd2的新增行 INSERT INTO dim_product SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max, t1.product_code, t1.product_name, t1.product_category, t1.version, t1.effective_date, t1.expiry_date FROM( SELECT t2.product_code product_code, t2.product_name product_name, t2.product_category product_category, t1.version + 1 `version`, ${hivevar:pre_date} effective_date, ${hivevar:max_date} expiry_date FROM dim_product t1 INNER JOIN rds.product t2 ON t1.product_code = t2.product_code AND t1.expiry_date = ${hivevar:pre_date} LEFT JOIN dim_product t3 ON t1.product_code = t3.product_code AND t3.expiry_date = ${hivevar:max_date} WHERE(t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL ) t1 CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max FROM dim_product) t2; -- 处理新增的 product 记录 INSERT INTO dim_product SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max, t1.product_code, t1.product_name, t1.product_category, 1, ${hivevar:pre_date}, ${hivevar:max_date} FROM( SELECT t1.* FROM rds.product t1 LEFT JOIN dim_product t2 ON t1.product_code = t2.product_code WHERE t2.product_sk IS NULL ) t1 CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max FROM dim_product) t2; -- 装载order维度 INSERT INTO dim_order SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max, t1.order_number, t1.version, t1.effective_date, t1.expiry_date FROM( SELECT order_number order_number, 1 `version`, order_date effective_date, '2050-01-01' expiry_date FROM rds.sales_order, rds.cdc_time WHERE entry_date >= last_load AND entry_date < current_load ) t1 CROSS JOIN( SELECT COALESCE(MAX(order_sk),0) sk_max FROM dim_order) t2; -- 装载销售订单事实表 INSERT INTO sales_fact_sales_order SELECT order_sk, customer_sk, product_sk, date_sk, order_amount FROM rds.sales_order a, dim_order b, dim_customer c, dim_product d, date_dim e, rds.cdc_time f WHERE a.order_number = b.order_number AND a.customer_number = c.customer_number AND a.order_date >= c.effective_date AND a.order_date < c.expiry_date AND a.product_code = d.product_code AND a.order_date >= d.effective_date AND a.order_date < d.expiry_date AND to_date(a.order_date) = e.date AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ; -- 更新时间戳表的last_load字段 INSERT overwrite TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
schedule_daily.sh每日周期调度sh脚本:
#!/bin/bash # 整体拉取customer、product表数据 sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table customer --hive-import --hive-table rds.customer --hive-overwrite sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table product --hive-import --hive-table rds.product --hive-overwrite # 执行增量导入 sqoop job --exec myjob_incremental_import # 调用 regular_etl.sql 文件执行定期装载 beeline -u jdbc:hive2://cdh2:10000/dw -f schedule_daily_etl.sql
使用案例:
-- 2015年各城市的手机销量 USE test; SELECT SUM(Units_Sold),City FROM Fact_Sales a JOIN Dim_Store b ON a.Store_Id = b.id JOIN Dim_Date c ON a.Date_Id = c.id JOIN Dim_Product d ON a.Product_Id = d.id WHERE c.Year=2018 AND d.Product_Category='mobile' GROUP BY City; USE snow; SELECT SUM(Units_Sold),City FROM Fact_Sales a JOIN Dim_Store b ON a.Store_Id = b.id JOIN Dim_Geography c ON b.Geography_Id = c.id JOIN Dim_Product d ON a.Product_Id = d.Product_Id JOIN Dim_Category e ON d.Category_Id = e.Category_Id JOIN Dim_Date f ON a.Date_Id = f.id WHERE e.Categoryt_Name='mobile' AND f.Year = 2015 GROUP BY City;