zoukankan      html  css  js  c++  java
  • Flink Sql Client初探

    注: 对应的sql_lib依赖jar,在参考博客的留言下面有

    1、运行f'link sql 

    1、首先进入flink目录,启动flink:bin/start-cluster.sh
    2、其次启动Flink SQL Client:bin/sql-client.sh embedded -l sql_lib

    2、启动界面

     3、测试demo

    DROP TABLE IF  EXISTS  `user`;
    
    CREATE TABLE `user` (
        id INT, 
        name STRING,
        create_time TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/test',
        'connector.table' = 'user',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'root',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    CREATE TABLE user_info (
        id INT,
        username STRING,
        password STRING
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/test',
        'connector.table' = 'user_info',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'root',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );

    4、执行sql,查看结果

    select u.id,u.name,u.create_time,ui.id,ui.username,ui.password  
    from `user` as u 
    left join user_info as ui 
    on u.id = ui.id;

     

     ===============实现Datax效果========================

    DROP TABLE IF  EXISTS  `user`;
    CREATE TABLE `user` (
        id INT, 
        name STRING,
        create_time TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/test',
        'connector.table' = 'user',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'root',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    DROP TABLE IF  EXISTS  `user_info`;
    CREATE TABLE user_info (
        id INT,
        username STRING,
        password STRING
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/test',
        'connector.table' = 'user_info',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'root',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    
    DROP TABLE IF  EXISTS  `user_all`;
    CREATE TABLE user_all (
        id INT,
        name STRING,
        create_time TIMESTAMP(3),
        bid INT,
        username STRING,
        password STRING
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/test',
        'connector.table' = 'user_all',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'root',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    
    insert into user_all
        select u.id,u.name,u.create_time,ui.id as bid,ui.username,ui.password  
        from `user` as u 
        left join user_info as ui 
        on u.id = ui.id group by u.id,u.name,u.create_time,ui.id,ui.username,ui.password  ;
  • 相关阅读:
    使用Idhttp.get('') 造成假死(堵塞),请问线程idhttp怎么才能做到不出错?
    mysql 修改字段类型
    Delphi完成的断点续传例子 转
    断点续传的例子
    甲状腺癌怎样早发现 可B超检查
    DELPHI高性能大容量SOCKET并发(九):稳定性问题解决
    百度地图信息提示框的修改 转
    delphi 调用百度地图WEBSERVICE转换GPS坐标 转
    delphi 调用百度地图api
    Gedit
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14276499.html
Copyright © 2011-2022 走看看