注: 对应的sql_lib依赖jar,在参考博客的留言下面有
1、运行f'link sql
1、首先进入flink目录,启动flink:bin/start-cluster.sh
2、其次启动Flink SQL Client:bin/sql-client.sh embedded -l sql_lib
2、启动界面
3、测试demo
DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( id INT, name STRING, create_time TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); CREATE TABLE user_info ( id INT, username STRING, password STRING ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );
4、执行sql,查看结果
select u.id,u.name,u.create_time,ui.id,ui.username,ui.password
from `user` as u
left join user_info as ui
on u.id = ui.id;
===============实现Datax效果========================
DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( id INT, name STRING, create_time TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); DROP TABLE IF EXISTS `user_info`; CREATE TABLE user_info ( id INT, username STRING, password STRING ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); DROP TABLE IF EXISTS `user_all`; CREATE TABLE user_all ( id INT, name STRING, create_time TIMESTAMP(3), bid INT, username STRING, password STRING ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'user_all', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); insert into user_all select u.id,u.name,u.create_time,ui.id as bid,ui.username,ui.password from `user` as u left join user_info as ui on u.id = ui.id group by u.id,u.name,u.create_time,ui.id,ui.username,ui.password ;