文章参考:https://www.cnblogs.com/ywjfx/p/14722302.html
-- 创建动态表分区
CREATE TABLE t_unity_auth(
`create_time` DATE NOT NULL COMMENT '创建时间',
`route_start` BIGINT(20) NOT NULL COMMENT '请求发起时间',
`route_end` BIGINT(20) NOT NULL COMMENT '请求结束时间',
`account` VARCHAR(32) NOT NULL COMMENT '账户'
)
DUPLICATE KEY(create_time,route_start,route_end, account)
PARTITION BY RANGE(create_time)(
PARTITION p20211101 VALUES LESS THAN ("2021-11-02")
)
DISTRIBUTED BY HASH(create_time,product_code) BUCKETS 32
PROPERTIES(
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32",
"replication_num" = "1"
);
-- 增加分区语句
ALTER TABLE t_unity_auth ADD
PARTITIONS START ("2021-11-03") END ("2022-11-03") EVERY (interval 1 day);
-- 查询分区语句
SHOW PARTITIONS FROM t_unity_auth ;
-- 查询索引语句
SHOW Index FROM t_unity_auth ;
-- 创建ROUTINE LOAD
CREATE ROUTINE LOAD TOPIC_UNITY_AUTH ON t_unity_auth
COLUMNS TERMINATED BY ",",
COLUMNS(create_time,route_id,route_end,account,content_id,product_code,all_pass,result,request_body)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "localhost:9095",
"kafka_topic" = "TOPIC_DEMO"
);
-- 查询ROUTINE LOAD语句
SHOW ROUTINE LOAD;
-- 停止ROUTINE LOAD语句
STOP ROUTINE LOAD FOR TOPIC_DEMO;
-- 重新启动ROUTINE LOAD语句
RESUME ROUTINE LOAD FOR TOPIC_DEMO;