mysql source
1 import java.sql.{Connection, DriverManager, PreparedStatement} 2 3 import org.apache.flink.configuration.Configuration 4 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} 5 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 6 7 object FlinkDemo06_CustomSource_mysql { 8 def main(args: Array[String]): Unit = { 9 //1 环境 10 val env = StreamExecutionEnvironment.getExecutionEnvironment 11 //2 流对象 12 import org.apache.flink.api.scala._ 13 val dStream: DataStream[Flight] = env.addSource(new MySqlSource) 14 //3 计算 统计次数 15 dStream.map(k=>(1,1)).keyBy(0).sum(1).print() 16 //4 执行 17 env.execute("custom mysql source") 18 } 19 20 class MySqlSource extends RichSourceFunction[Flight] { 21 private var connection: Connection = null 22 private var ps: PreparedStatement = null 23 24 override def open(parameters: Configuration): Unit = { 25 val driver = "com.mysql.jdbc.Driver" 26 val url = "jdbc:mysql://localhost:3306/test_db" 27 val username = "root" 28 Class.forName(driver) 29 connection = DriverManager.getConnection(url, username, "123456") 30 val sql = "select id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID from flight" 31 ps = connection.prepareStatement(sql) 32 } 33 34 override def close(): Unit = { 35 if (connection != null) { 36 connection.close() 37 } 38 if (ps != null) { 39 ps.close() 40 } 41 } 42 43 override def run(ctx: SourceFunction.SourceContext[Flight]): Unit = { 44 //获取结果集,遍历并输出,关闭结果集 45 val rs = ps.executeQuery() 46 while(rs.next()) { 47 val flight = Flight(rs.getFloat("avgTicketPrice"),rs.getString("cancelled"),rs.getString("carrier"),rs.getString("dest"),rs.getString("destAirportID"),rs.getString("origin"),rs.getString("originAirportID")) 48 ctx.collect(flight) 49 } 50 rs.close() 51 } 52 53 override def cancel(): Unit = {} 54 } 55 56 case class Flight(avgTicketPrice: Float, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String) 57 }
数据准备
1 import java.io.{BufferedReader, FileReader} 2 import java.sql.{Connection, DriverManager, PreparedStatement} 3 4 import org.slf4j.LoggerFactory 5 6 object FlinkDemo06_DataHelp { 7 val logger = LoggerFactory.getLogger(FlinkDemo06_DataHelp.getClass) 8 9 val url = "jdbc:mysql://localhost:3306/test_db" 10 val driver = "com.mysql.jdbc.Driver" 11 val username = "root" 12 13 case class Flight(avgTicketPrice: String, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String) 14 15 def main(args: Array[String]): Unit = { 16 Class.forName(driver) 17 var conn: Connection = null 18 var stat: PreparedStatement = null 19 val br = new BufferedReader(new FileReader("I:\projectImplement\dataWareHouse\test-es\data\630data.csv")) 20 var line: String = null 21 try { 22 conn = DriverManager.getConnection(url, username, "123456") 23 val sql = 24 s""" 25 |insert into flight 26 | (id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID) 27 |values 28 | (null,?,?,?,?,?,?,?) 29 |""".stripMargin 30 stat = conn.prepareStatement(sql) 31 32 while ((line = br.readLine()) != null) { 33 val vals = line.split(",") 34 stat.setFloat(1, vals(0).toFloat) 35 stat.setString(2, vals(1)) 36 stat.setString(3, vals(2)) 37 stat.setString(4, vals(3)) 38 stat.setString(5, vals(4)) 39 stat.setString(6, vals(5)) 40 stat.setString(7, vals(6)) 41 val count = stat.executeUpdate() 42 if (count != 1) logger.error("插入失败") 43 stat.clearParameters() 44 45 Thread.sleep(10000L) 46 } 47 } catch { 48 case e: Exception => { 49 logger.error(e.toString) 50 } 51 } finally { 52 try { 53 if (br != null) { 54 br.close() 55 } 56 if (stat != null) { 57 stat.close() 58 } 59 if (conn != null) { 60 conn.close() 61 } 62 } catch { 63 case e: Exception => logger.info(e.toString) 64 } 65 } 66 } 67 } 68 69 /* 70 create table flight ( 71 id int auto_increment, 72 avgTicketPrice float, 73 cancelled varchar(5), 74 carrier varchar(100), 75 dest varchar(100), 76 destAirportID varchar(100), 77 origin varchar(100), 78 originAirportID varchar(100), 79 primary key(id) 80 ); 81 */
1 642.5951538085938 ,false,Logstash Airways,OR Tambo International Airport,JNB,Al Maktoum International Airport ,DWC 2 328.17108154296875,false,ES-Air,Licenciado Benito Juarez International Airport ,AICM ,Copenhagen Kastrup Airport,CPH 3 774.8056030273438 ,false,Kibana Airlines ,Stockholm-Arlanda Airport,ARN,San Diego International Airport,SAN 4 650.2644653320312 ,false,JetBeats,Ukrainka Air Base ,XHBU ,Adolfo Suarez Madrid— Barajas Airport ,MAD 5 227.354248046875,false,Kibana Airlines ,Winnipeg / James Armstrong Richardson International Airport,YWG,Winnipeg / James Armstrong Richardson International Airport ,YWG 6 575.6364135742188 ,false,Logstash Airways,Gimpo International Airport ,GMP,Genoa Cristoforo Colombo Airport ,GE01 7 619.7527465820312 ,true ,ES-Air,Tokyo Haneda International Airport ,HND,Shanghai Hongqiao International Airport ,SHA 8 612.101318359375,false,Logstash Airways,Reno Tahoe International Airport,RNO,Dubai International Airport,DXB 9 998.6478881835938 ,false,Kibana Airlines ,Xi'an Xianyang International Airport ,XIY,Kempegowda International Airport ,BLR 10 573.07666015625,false,Kibana Airlines ,Kansai International Airport,KIX,Beijing Capital International Airport ,PEK