zoukankan      html  css  js  c++  java
  • flink 自定义 mysql source

    mysql source

     1 import java.sql.{Connection, DriverManager, PreparedStatement}
     2 
     3 import org.apache.flink.configuration.Configuration
     4 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
     5 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     6 
     7 object FlinkDemo06_CustomSource_mysql {
     8     def main(args: Array[String]): Unit = {
     9         //1 环境
    10         val env = StreamExecutionEnvironment.getExecutionEnvironment
    11         //2 流对象
    12         import org.apache.flink.api.scala._
    13         val dStream: DataStream[Flight] = env.addSource(new MySqlSource)
    14         //3 计算 统计次数
    15         dStream.map(k=>(1,1)).keyBy(0).sum(1).print()
    16         //4 执行
    17         env.execute("custom mysql source")
    18     }
    19     
    20     class MySqlSource extends RichSourceFunction[Flight] {
    21         private var connection: Connection = null
    22         private var ps: PreparedStatement = null
    23         
    24         override def open(parameters: Configuration): Unit = {
    25             val driver = "com.mysql.jdbc.Driver"
    26             val url = "jdbc:mysql://localhost:3306/test_db"
    27             val username = "root"
    28             Class.forName(driver)
    29             connection = DriverManager.getConnection(url, username, "123456")
    30             val sql = "select id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID from flight"
    31             ps = connection.prepareStatement(sql)
    32         }
    33         
    34         override def close(): Unit = {
    35             if (connection != null) {
    36                 connection.close()
    37             }
    38             if (ps != null) {
    39                 ps.close()
    40             }
    41         }
    42         
    43         override def run(ctx: SourceFunction.SourceContext[Flight]): Unit = {
    44             //获取结果集,遍历并输出,关闭结果集
    45             val rs = ps.executeQuery()
    46             while(rs.next()) {
    47                 val flight = Flight(rs.getFloat("avgTicketPrice"),rs.getString("cancelled"),rs.getString("carrier"),rs.getString("dest"),rs.getString("destAirportID"),rs.getString("origin"),rs.getString("originAirportID"))
    48                 ctx.collect(flight)
    49             }
    50             rs.close()
    51         }
    52         
    53         override def cancel(): Unit = {}
    54     }
    55     
    56     case class Flight(avgTicketPrice: Float, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String)    
    57 }
    View Code

    数据准备

     1 import java.io.{BufferedReader, FileReader}
     2 import java.sql.{Connection, DriverManager, PreparedStatement}
     3 
     4 import org.slf4j.LoggerFactory
     5 
     6 object FlinkDemo06_DataHelp {
     7     val logger = LoggerFactory.getLogger(FlinkDemo06_DataHelp.getClass)
     8     
     9     val url = "jdbc:mysql://localhost:3306/test_db"
    10     val driver = "com.mysql.jdbc.Driver"
    11     val username = "root"
    12     
    13     case class Flight(avgTicketPrice: String, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String)
    14     
    15     def main(args: Array[String]): Unit = {
    16         Class.forName(driver)
    17         var conn: Connection = null
    18         var stat: PreparedStatement = null
    19         val br = new BufferedReader(new FileReader("I:\projectImplement\dataWareHouse\test-es\data\630data.csv"))
    20         var line: String = null
    21         try {
    22             conn = DriverManager.getConnection(url, username, "123456")
    23             val sql =
    24                 s"""
    25                    |insert into flight
    26                    | (id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID)
    27                    |values
    28                    | (null,?,?,?,?,?,?,?)
    29                    |""".stripMargin
    30             stat = conn.prepareStatement(sql)
    31             
    32             while ((line = br.readLine()) != null) {
    33                 val vals = line.split(",")
    34                 stat.setFloat(1, vals(0).toFloat)
    35                 stat.setString(2, vals(1))
    36                 stat.setString(3, vals(2))
    37                 stat.setString(4, vals(3))
    38                 stat.setString(5, vals(4))
    39                 stat.setString(6, vals(5))
    40                 stat.setString(7, vals(6))
    41                 val count = stat.executeUpdate()
    42                 if (count != 1) logger.error("插入失败")
    43                 stat.clearParameters()
    44                 
    45                 Thread.sleep(10000L)
    46             }
    47         } catch {
    48             case e: Exception => {
    49                 logger.error(e.toString)
    50             }
    51         } finally {
    52             try {
    53                 if (br != null) {
    54                     br.close()
    55                 }
    56                 if (stat != null) {
    57                     stat.close()
    58                 }
    59                 if (conn != null) {
    60                     conn.close()
    61                 }
    62             } catch {
    63                 case e: Exception => logger.info(e.toString)
    64             }
    65         }
    66     }
    67 }
    68 
    69 /*
    70 create table flight (
    71 id int auto_increment,
    72 avgTicketPrice float,
    73 cancelled varchar(5),
    74 carrier varchar(100),
    75 dest varchar(100),
    76 destAirportID varchar(100),
    77 origin varchar(100),
    78 originAirportID varchar(100),
    79 primary key(id)
    80 );
    81  */
    View Code
     1 642.5951538085938 ,false,Logstash Airways,OR Tambo International Airport,JNB,Al Maktoum International Airport ,DWC
     2 328.17108154296875,false,ES-Air,Licenciado Benito Juarez International Airport ,AICM ,Copenhagen Kastrup Airport,CPH
     3 774.8056030273438 ,false,Kibana Airlines ,Stockholm-Arlanda Airport,ARN,San Diego International Airport,SAN
     4 650.2644653320312 ,false,JetBeats,Ukrainka Air Base ,XHBU ,Adolfo Suarez Madrid— Barajas Airport ,MAD
     5 227.354248046875,false,Kibana Airlines ,Winnipeg / James Armstrong Richardson International Airport,YWG,Winnipeg / James Armstrong Richardson International Airport ,YWG
     6 575.6364135742188 ,false,Logstash Airways,Gimpo International Airport ,GMP,Genoa Cristoforo Colombo Airport ,GE01 
     7 619.7527465820312 ,true ,ES-Air,Tokyo Haneda International Airport ,HND,Shanghai Hongqiao International Airport ,SHA
     8 612.101318359375,false,Logstash Airways,Reno Tahoe International Airport,RNO,Dubai International Airport,DXB
     9 998.6478881835938 ,false,Kibana Airlines ,Xi'an Xianyang International Airport ,XIY,Kempegowda International Airport ,BLR
    10 573.07666015625,false,Kibana Airlines ,Kansai International Airport,KIX,Beijing Capital International Airport ,PEK
    View Code
  • 相关阅读:
    ngnix之笔记
    nginx之"/"结尾
    nginx之root和alias区别
    Python3 操作Excel--openpyxl
    python学习笔记之线程、进程和协程(第八天)
    python之堡垒机(第九天)
    python学习笔记之socket(第七天)
    python学习笔记之paramiko和sqlalchemy (第九天)
    python之在线PK游戏(第六天)
    python学习笔记之类class(第六天)
  • 原文地址:https://www.cnblogs.com/xiefeichn/p/13174977.html
Copyright © 2011-2022 走看看