思路
- 从一个文件中导入表结构(Structure)(常用于批计算)(静态)
package com.kaikeba.mysql.demo import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.JDBCInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.types.Row object Flink2Mysql { def main(args: Array[String]): Unit = { //设定执行环境 val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) //通过创建JDBCInputFormat读取JDBC数据源 val jdbcDataSet: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false") .setUsername("root") .setPassword("Chen1227+") .setQuery("select * from filter") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish() ) //将DataSet注册为表 tEnv.registerDataSet("tb", jdbcDataSet) //执行查询操作 val table = tEnv.sqlQuery("select * from tb") //把table转为DataSet tEnv.toDataSet[Row](table).print() } }
参考
Flink 读写 Mysql
https://blog.csdn.net/Android_xue/article/details/102705711
https://blog.csdn.net/ranyizhang/article/details/103759251
https://www.cnblogs.com/Gxiaobai/p/12645497.html
Flink流处理访问MySQL
https://blog.csdn.net/u012447842/article/details/89175772
Flink实例
https://blog.csdn.net/xianpanjia4616/article/details/98318750