1 工程目录结构
pom.xml
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.9.2</version> </dependency>
2 flink 读取MySQL
1) 通过自定义source提交
MySQLSource
package com.atguigu.flink.source import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import com.atguigu.flink.bean.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} class MySQLSource extends RichSourceFunction[SensorReading] { var conn:Connection = null var ps:PreparedStatement = null // 流打开时操作 override def open(parameters: Configuration): Unit = { // 加载驱动 Class.forName("com.mysql.jdbc.Driver") // 数据库连接 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456") ps = conn.prepareStatement("select * from sensor limit 5") } // 流运行时操作 override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { try { var resultSet:ResultSet = ps.executeQuery() while (resultSet.next()){ var id:String = resultSet.getString("id") var curTime:Long = resultSet.getLong("timestamp") var timepreture:Double = resultSet.getDouble("timepreture") sourceContext.collect(SensorReading(id,curTime,timepreture)) } } catch { case _:Exception => 0 } finally { conn.close() } } // 流关闭时操作 override def cancel(): Unit = { try{ if(conn!=null){ conn.close() } if(ps!=null){ ps.close() } } catch { case _:Exception => print("error") } } }
主程序入口 MySQLSourceSinkApp
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.sink.MySQLSink import com.atguigu.flink.source.MySQLSource import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ object MySQLSourceSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource) //调用addSink以此来作为数据输出端 stream.addSink(new MySQLSink()) // 打印流 stream.print() // 执行主程序 env.execute() } }
2) 通过 JDBCInputFormat方式
val sql_read = "select * from sensor limit 5"
def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 获取数据流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 转化为自定义格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream }
主程序入口MySQLSourceSinkApp2
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.types.Row object MySQLSourceSinkApp2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false" val username = "root" val password = "123456" val sql_read = "select * from sensor limit 5" val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)" def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 获取数据流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 转化为自定义格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream } // 读取mysql数据 val readStream = readMysql(env, url, driver ,username ,password ,sql_read) } }
3 flink 写入 MySQL
1) 通过自定义Sink提交
MySQLSink
package com.atguigu.flink.sink import java.sql.{Connection, DriverManager, PreparedStatement} import com.atguigu.flink.bean.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction class MySQLSink extends RichSinkFunction[SensorReading]{ var conn:Connection = null var ps:PreparedStatement = null val INSERT_CASE:String = "INSERT INTO sensor (id, timestamp,timepreture) " + "VALUES (?, ?, ?) " override def open(parameters: Configuration): Unit = { // 加载驱动 Class.forName("com.mysql.jdbc.Driver") // 数据库连接 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456") ps = conn.prepareStatement(INSERT_CASE) } override def invoke(value:SensorReading): Unit = { try{ ps.setString(1,value.id) ps.setLong(2,value.timestamp) ps.setDouble(3,value.timepreture) ps.addBatch() ps.executeBatch() } catch { case _:Exception => 0 } } }
主程序入口MySQLSourceSinkApp
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.sink.MySQLSink import com.atguigu.flink.source.MySQLSource import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ object MySQLSourceSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource) //调用addSink以此来作为数据输出端 stream.addSink(new MySQLSink()) // 打印流 stream.print() // 执行主程序 env.execute() } }
2) 通过JDBCOutputFormat
在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。
JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。
val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)" def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") }
主程序入口MySQLSourceSinkApp2
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.types.Row object MySQLSourceSinkApp2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false" val username = "root" val password = "123456" val sql_read = "select * from sensor limit 5" val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)" def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 获取数据流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 转化为自定义格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream } // 读取mysql数据 val readStream = readMysql(env, url, driver ,username ,password ,sql_read)
// 将流中的数据格式转化为JDBCOutputFormat接受的格式 val outputData = readStream.map(x => { val row = new Row(3) row.setField(0, x.id) row.setField(1, x.timestamp) row.setField(2, x.timepreture) row }) def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") } // 向mysql插入数据 writeMysql(env,outputData,url,username,password,sql_write) } }
4 scala读取 MySQL
MysqlUtil
package com.atguigu.flink.utils import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, Statement} import com.alibaba.fastjson.JSONObject import scala.collection.mutable.ListBuffer object MysqlUtil { def main(args: Array[String]): Unit = { val list: List[ JSONObject] = queryList("select * from sensor limit 5") println(list) } def queryList(sql:String):List[JSONObject]={ //加载驱动 Class.forName("com.mysql.jdbc.Driver") val resultList: ListBuffer[JSONObject] = new ListBuffer[ JSONObject]() //链接数据库 val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456") val stat: Statement = conn.createStatement val rs: ResultSet = stat.executeQuery(sql ) val md: ResultSetMetaData = rs.getMetaData while ( rs.next ) { val rowData = new JSONObject(); for (i <-1 to md.getColumnCount ) { rowData.put(md.getColumnName(i), rs.getObject(i)) } resultList+=rowData } stat.close() conn.close() resultList.toList // } }