zoukankan      html  css  js  c++  java
  • Flink批处理读取Hive写入MySql

    把hive 表stu77 的数据写入 mysql 表test_stu 中.

    中间可以加自己的逻辑.

    
    
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.StatementSet;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    
    /**
     * @Auther WeiJiQian
     * @描述  可行
     */
    public class FlinkReadHiveAndWriteMysql {
    
    
        public static void main(String[] args) throws Exception {
    
            EnvironmentSettings settings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inBatchMode()
                    .build();
    
            TableEnvironment tableEnv = TableEnvironment.create(settings);
            String name = "myhive";      // Catalog名称,定义一个唯一的名称表示
            String defaultDatabase = "test";  // 默认数据库名称
            String hiveConfDir = "/data/apache-hive-2.3.6-bin/conf";  // hive-site.xml路径
            String version = "2.3.6";       // Hive版本号
    
            HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
            StatementSet statementSet = tableEnv.createStatementSet();
    
            tableEnv.registerCatalog(name, hive);
            tableEnv.useCatalog(name);
    
            Table sqlResult = tableEnv.sqlQuery("select name,age from test.stu77");
    
            String sql =
                    "create table testOut ( " +
                    "name varchar(20) not null, "+
                    "age varchar(20) not null "+
                    ") with ( "+
                    "'connector.type' = 'jdbc',"+
                    "'connector.url' = 'jdbc:mysql://192.168.1.1:3306/jeecg_boot?characterEncoding=UTF-8',"+
                    "'connector.table' = 'test_stu',"+
                    "'connector.driver' = 'com.mysql.jdbc.Driver',"+
                    "'connector.username' = 'root',"+
                    "'connector.password' = '123456')";
             tableEnv.executeSql(sql);
            statementSet.addInsert("testOut",sqlResult);
    
            statementSet.execute();
    
        }
    }
    
    
  • 相关阅读:
    向量、矩阵常用范数
    关于HP M451网络连接的资料
    pyqt程序最小化到系统托盘(未测试)
    博客园美化大集合2020最新!不用担心失效问题!
    添加QQ聊天
    关于加密
    python抓取谷歌学术关键词下文章题目
    如何确定网站可否可爬取
    灵狐浏览器
    利用beautifulsoup进行对标签的二次查找-以打印网易云歌单内容为例
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14088161.html
Copyright © 2011-2022 走看看