Flink读写mysql
如果是mvn项目的话,需要预先导入相应的包:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.24</version> </dependency>
1、读
import java.time.LocalDateTime import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.types.Row import org.apache.log4j.Logger object OperatorMysql extends Logger("opeartorMysql") { val log = Logger.getLogger("opeartorMysql") def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://192.168.1.1:3306/ipvacloud?useUnicode=true&characterEncoding=utf-8" val username = "root" val password = "123456" log.info("--------read mysql-----------") val sql_read = "select relationid,year,month from reid_kequn_arave_times" readMysql(env, url, driver, username, password, sql_read) }
/** * 读mysql * * @param env * @param url * @param user * @param pwd * @param sql */ def readMysql(env: ExecutionEnvironment, url: String, driver: String, user: String, pwd: String, sql: String) = { val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) .finish()) dataResult.map(x => { val relationid = x.getField(0) val year = x.getField(1) val month = x.getField(2) (relationid, year, month) }).print() }
运行结果:
2、写
import java.time.LocalDateTime import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.types.Row import org.apache.log4j.Logger /** * AUTHOR Guozy * DATE 2020/3/14-10:44 **/ object OperatorMysql extends Logger("opeartorMysql") { val log = Logger.getLogger("opeartorMysql") def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://192.168.1.1:3306/ipvacloud?useUnicode=true&characterEncoding=utf-8" val username = "root" val password = "winner@001" log.info("--------write mysql-----------") val sql_write = "insert into attention_newdata(Hostname,ChannelNum,enabled,LastTime,CreateTime,ModifyTime) values(?,?,?,?,?,?) on duplicate key update ModifyTime=NOW()" val curTime = LocalDateTime.now().toString.replace("T", " ") val outputData = env.fromElements(("ttt", "0", "1", curTime, curTime, curTime)) .map(x => { val row = new Row(6) row.setField(0, x._1) row.setField(1, x._2) row.setField(2, x._3) row.setField(3, x._4) row.setField(4, x._5) row.setField(5, x._6) row }) writeMysql(env, outputData, url, username, password, sql_write) }
// 写mysql def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") }
运行结果: