json字段格式
{
"a": {
"10000": 11,
"20000": 22
},
"b": {
"10000": 14,
"20000": 255
}
}
with
q as (
select t.member_id,
replace(t.prod_code, '"') as prod_code ,
replace(replace(t.score_use, "{"), '}') as score_use
from (
select member_id,prod_code, score_use
from (select member_id, extra_json from ods_tables)
lateral view explode(
str_to_map(
substr(extra_json, 2, length(extra_json) -2)
, '},"'
)
) t as prod_code, score_use
) t
)
select q.member_id, tmp.gcode, tmp.score from q lateral view explode(str_to_map(q.score_use)) tmp as gcode,score
-- example 2 与example 1 效果相同, 把with查询使用子查询实现
-- 字表方式
-- 写数据
INSERT OVERWRITE TABLE cdp.dim_set_coupon
-- 这里由于要inner join 其他表, 因此包了一个ttt, 如果不关联可以不需要ttt的这层
select
ttt.item_name AS item_name
,ttt.gl_code as gl_code
,ttt.item_num AS item_num
,ttt.item_price AS item_price
,tt2.coupon_status AS coupon_status
,tt2.created_at AS created_at
,tt2.updated_at AS updated_at
,ttt.bu_code AS bu_code
,ttt.item_code AS item_code
,tt2.benefit_id AS benefit_id
,ttt.member_id as member_id
,ttt.trade_no as trade_no
,ttt.sub_trade_no as sub_trade_no
,ttt.status as status
,ttt.extra_json as extra_json
,ttt.pt AS pt
from
(
(
select
tt1.item_num,
tt1.trade_no,
tt1.sub_trade_no,
tt1.status,
tt1.item_name,
tt1.bu_code,
tt1.extra_json,
tt1.item_code,
tt1.member_id,
tt1.pt,
replace(tmp.gl_code, '"') as gl_code,
tmp.item_price
from (
select
tt0.trade_no AS trade_no,
tt0.sub_trade_no AS sub_trade_no,
tt0.status AS status,
tt0.item_name AS item_name,
tt0.extra_json AS extra_json,
null AS item_num,
tt0.bu_code AS bu_code,
tt0.pt,
tt0.member_id,
replace(tt0.prod_code, '"') as item_code ,
replace(replace(tt0.score_use, "{"), '}') as score_use
from (
select * --,prod_code, score_use
from (select *
from cdp.ods_aquarius_item_redeem
where extra_json is not null
-- and item_type='COUPON' and (created_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM')
-- or updated_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM'))
)
lateral view explode(
str_to_map(
substr(extra_json, 2, length(extra_json) -2)
, '},"'
)
) t as prod_code, score_use
) tt0
) tt1 lateral view explode(str_to_map(tt1.score_use)) tmp as gl_code,item_price
)
union all
-- 因为上面是 extjson not null的情况, 为了兼容老数据增加null的情况
(
select
t2.item_num,
t2.trade_no,
t2.sub_trade_no,
t2.status,
t2.item_name,
t2.bu_code,
t2.extra_json,
t2.item_code,
t2.member_id,
t2.pt,
t2.gl_code,
t2.item_price
from cdp.ods_aquarius_item_redeem t2 where t2.extra_json is null
-- and t2.item_type='COUPON' and (t2.created_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM')
-- or t2.updated_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM'))
)
) ttt
INNER JOIN
(
SELECT
id,
benefit_id,
tenant_id,
shop_id,
user_id,
activity_id,
coupon_status,
used_at,
outer_code,
use_channel,
collect_type,
created_at,
updated_at,
name,
decipher(user_name,'UFlR9JGjmlZ1EnvDXfIkX5Fcj38FCyNGkzIscCz0ADvFu2NMgYx4FqPyGpCsjSW9gM1FvO48PrFvqT3VS3U3Lw==') as user_name,
expired_at,
send,
dl_auto_created_at,
dl_auto_updated_at,
cdc_hash_code,
pt
FROM cdp.ods_user_benefit_expand
) tt2
ON ttt.sub_trade_no=tt2.outer_code
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------- dim_set_gift_data_q2 --------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-- 写数据
INSERT OVERWRITE TABLE cdp.dim_set_gift_data_q2
-- 这里由于要inner join 其他表, 因此包了一个ttt, 如果不关联可以不需要ttt的这层
select
ttt.item_name AS item_name
,ttt.gl_code AS gl_code
,ttt.item_num AS item_num
,ttt.item_price AS item_price
,ttt.created_at AS created_at
,ttt.updated_at AS updated_at
,ttt.bu_code AS bu_code
,ttt.item_code AS item_code
,ttt.member_id as member_id
,ttt.trade_no as trade_no
,ttt.sub_trade_no as sub_trade_no
,ttt.status as status
,ttt.extra_json as extra_json
,ttt.pt AS pt
from
(
(
select
tt1.item_num,
tt1.trade_no,
tt1.sub_trade_no,
tt1.status,
tt1.item_name,
tt1.bu_code,
tt1.extra_json,
tt1.item_code,
tt1.member_id,
tt1.created_at,
tt1.updated_at,
tt1.pt,
replace(tmp.gl_code, '"') as gl_code,
tmp.item_price
from (
select
tt0.trade_no AS trade_no,
tt0.sub_trade_no AS sub_trade_no,
tt0.status AS status,
tt0.item_name AS item_name,
tt0.extra_json AS extra_json,
null AS item_num,
tt0.bu_code AS bu_code,
tt0.pt,
tt0.member_id,
tt0.created_at,
tt0.updated_at,
replace(tt0.prod_code, '"') as item_code ,
replace(replace(tt0.score_use, "{"), '}') as score_use
from (
select * --,prod_code, score_use
from (select *
from cdp.ods_aquarius_item_redeem
where extra_json is not null
-- and item_type='COUPON' and (created_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM')
-- or updated_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM'))
)
lateral view explode(
str_to_map(
substr(extra_json, 2, length(extra_json) -2)
, '},"'
)
) t as prod_code, score_use
) tt0
) tt1 lateral view explode(str_to_map(tt1.score_use)) tmp as gl_code,item_price
)
union all
-- 因为上面是 extjson not null的情况, 为了兼容老数据增加null的情况
(
select
t2.item_num,
t2.trade_no,
t2.sub_trade_no,
t2.status,
t2.item_name,
t2.bu_code,
t2.extra_json,
t2.item_code,
t2.member_id,
t2.created_at,
t2.updated_at,
t2.pt,
t2.gl_code,
t2.item_price
from cdp.ods_aquarius_item_redeem t2 where t2.extra_json is null
-- and t2.item_type='COUPON' and (t2.created_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM')
-- or t2.updated_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM'))
)
) ttt
INSERT OVERWRITE TABLE cdp.ads_set_gift_list
SELECT
tt1.bu_code AS bu_code
,tt1.item_code AS item_code
,tt1.item_name AS item_name
,tt1.gl_code AS gl_code
,tt1.item_num AS item_num
,tt1.item_price AS item_price
,tt1.status AS status
,IF(tt1.extra_json is null, tt1.item_price,tt1.item_price*tt1.item_num) as point
,tt1.created_at AS created_at
,tt1.updated_at AS updated_at
,tt1.extra_json AS extra_json
,tt1.trade_no AS trad_no
,tt1.sub_trade_no AS sub_trade_no
,tt1.member_id AS member_id
,tt1.pt AS pt
FROM cdp.dim_set_gift_data_q2 tt1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-- with 写法
with
q as (
select
tt1.trade_no AS trade_no,
tt1.sub_trade_no AS sub_trade_no,
tt1.status AS status,
tt1.item_name AS item_name,
tt1.extra_json AS extra_json,
null AS item_num,
tt1.bu_code AS bu_code,
tt1.pt,
tt1.member_id,
replace(tt1.prod_code, '"') as item_code ,
replace(replace(tt1.score_use, "{"), '}') as score_use
from (
select * --,prod_code, score_use
from (select *
from cdp.ods_aquarius_item_redeem
where extra_json is not null
-- and item_type='COUPON' and (created_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM')
-- or updated_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM'))
)
lateral view explode(
str_to_map(
substr(extra_json, 2, length(extra_json) -2)
, '},"'
)
) t as prod_code, score_use
) tt1
)
-- 写数据
INSERT OVERWRITE TABLE cdp.dim_set_coupon
-- 这里由于要inner join 其他表, 因此包了一个ttt, 如果不关联可以不需要ttt的这层
select
ttt.item_name AS item_name
,ttt.gl_code AS gl_code
,ttt.item_num AS item_num
,ttt.item_price AS item_price
,tt2.coupon_status AS coupon_status
,tt2.created_at AS created_at
,tt2.updated_at AS updated_at
,ttt.bu_code AS bu_code
,ttt.item_code AS item_code
,tt2.benefit_id AS benefit_id
,ttt.member_id as member_id
,ttt.trade_no as trade_no
,ttt.sub_trade_no as sub_trade_no
,ttt.status as status
,ttt.extra_json as extra_json
,ttt.pt AS pt
from
(
(
select
q.item_num,
q.trade_no,
q.sub_trade_no,
q.status,
q.item_name,
q.bu_code,
q.extra_json,
q.item_code,
q.member_id,
q.pt,
replace(tmp.gl_code, '"') as gl_code,
tmp.item_price from q lateral view explode(str_to_map(q.score_use)) tmp as gl_code,item_price
)
union all
-- 因为上面是 extjson not null的情况, 为了兼容老数据增加null的情况
(
select
t2.item_num,
t2.trade_no,
t2.sub_trade_no,
t2.status,
t2.item_name,
t2.bu_code,
t2.extra_json,
t2.item_code,
t2.member_id,
t2.pt,
t2.gl_code,
t2.item_price
from ods_aquarius_item_redeem t2 where t2.extra_json is null
-- and t2.item_type='COUPON' and (t2.created_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM')
-- or t2.updated_at BETWEEN trunc(add_months(CURRENT_TIMESTAMP,-1),'MM') AND trunc(from_unixtime(unix_timestamp(),'yyyy-MM-dd') ,'MM'))
)
) ttt
INNER JOIN
(
SELECT
id,
benefit_id,
tenant_id,
shop_id,
user_id,
activity_id,
coupon_status,
used_at,
outer_code,
use_channel,
collect_type,
created_at,
updated_at,
name,
decipher(user_name,'UFlR9JGjmlZ1EnvDXfIkX5Fcj38FCyNGkzIscCz0ADvFu2NMgYx4FqPyGpCsjSW9gM1FvO48PrFvqT3VS3U3Lw==') as user_name,
expired_at,
send,
dl_auto_created_at,
dl_auto_updated_at,
cdc_hash_code,
pt
FROM cdp.ods_user_benefit_expand
) tt2
ON ttt.sub_trade_no=tt2.outer_code
-- HIVE DDL create
create table if not exists cdp.dim_set_coupon
(
item_name string comment '',
gl_code string comment '_pk',
item_num int comment '',
item_price int comment '_pk',
coupon_status tinyint comment '',
created_at string comment '',
updated_at string comment '',
bu_code string comment '',
item_code string comment '',
benefit_id bigint comment '_pk',
member_id bigint comment '_pk',
trade_no string comment '_pk',
sub_trade_no string comment '_pk',
status string comment '',
extra_json string comment '',
pt string comment '_pk'
)
using org.apache.spark.sql.cassandra options (keyspace 'cdp', table 'dim_set_coupon',confirm.truncate 'true');
create table if not exists cdp.dim_set_gift_data_q2
(
item_name string comment '',
gl_code string comment '_pk',
item_num int comment '',
item_price int comment '_pk',
created_at string comment '',
updated_at string comment '',
bu_code string comment '',
item_code string comment '',
member_id bigint comment '_pk',
trade_no string comment '_pk',
sub_trade_no string comment '_pk',
status string comment '',
extra_json string comment '',
pt string comment '_pk'
)
using org.apache.spark.sql.cassandra options (keyspace 'cdp', table 'dim_set_gift_data_q2',confirm.truncate 'true');
create table if not exists cdp.ads_set_gift_list
(
bu_code string comment '',
item_code string comment '',
item_name string comment '',
gl_code string comment '_pk',
item_num int comment '',
item_price int comment '_pk',
status string comment '',
point int comment '',
created_at string comment '',
updated_at string comment '',
extra_json string comment '',
trade_no string comment '_pk',
sub_trade_no string comment '_pk',
member_id bigint comment '_pk',
pt string comment '_pk'
)
using org.apache.spark.sql.cassandra options (keyspace 'cdp', table 'ads_set_gift_list', confirm.truncate 'true');
--RDS alter
alter TABLE `ads_set_coupon_list` add (
member_id bigint,
trade_no varchar(32) NOT NULL COMMENT '订单编号',
sub_trade_no varchar(32) NOT NULL COMMENT '子订单编号',
status varchar(16) NOT NULL COMMENT '状态 ',
extra_json text COMMENT '扩展json字段'
)
alter TABLE `ads_set_gift_list` add (
member_id bigint,
trade_no varchar(32) NOT NULL COMMENT '订单编号',
sub_trade_no varchar(32) NOT NULL COMMENT '子订单编号',
extra_json text COMMENT '扩展json字段'
)
-- 其他
-- ext1 =
[ { "buCode": "ALIPAY", "accounts": [ { "glCode": "20000", "glName": "臻选积分", "price": 30 } ] }, { "buCode": "H5", "accounts": [ { "glCode": "20000", "glName": "臻选积分", "price": 46 } ] }, { "buCode": "WECHAT", "accounts": [ { "glCode": "10000", "glName": "礼享积分", "price": 827 } ] }, { "buCode": "JD", "accounts": [ { "glCode": "10000", "glName": "礼享积分", "price": 90 } ] } ]
INSERT OVERWRITE TABLE cdp.dim_cust_aquarius_item
SELECT DISTINCT
id,
content_id,
name,
category_id,
code,
exchange_accounts,
status,
MAX(glcode) OVER(PARTITION BY id) as glcode,
MAX(glname) OVER(PARTITION BY id) as glname,
MAX(cast(exchange_score as bigint)) OVER(PARTITION BY id) as exchange_score,
self_id,
pt
FROM
(
SELECT
id,
content_id,
name,
GET_JSON_OBJECT(extra_json,'$.categoryId') as category_id,
product_code as code,
ext1 as exchange_accounts,
product_status as status,
GET_JSON_OBJECT(CONCAT('{',view_exchange_accounts,'}'),'$.glCode') as glcode,
GET_JSON_OBJECT(CONCAT('{',view_exchange_accounts,'}'),'$.glName') as glname,
GET_JSON_OBJECT(CONCAT('{',view_exchange_accounts,'}'),'$.price') as exchange_score,
1 as self_id,
CURRENT_DATE() AS pt
FROM
cdp.ods_cms_content_tag
LATERAL VIEW explode(
split(
regexp_replace(regexp_replace(GET_JSON_OBJECT(ext1, '$[*].accounts'), '\[+\[+\{', ''),
'\}+\]+\]', ''),
'\}+\]+\,+\[+\{')
) t_view as view_exchange_accounts
WHERE
is_in_sale = 1
AND type = 5
) A