zoukankan      html  css  js  c++  java
  • Flink 写数据到MySql (JDBC Sink)

    POM 文件

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.44</version>
            </dependency>

    代码:

    package com.kpwong.sink
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    
    object JDBCSinkTest {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
    
        //sink
    
        socketDS.addSink(new MyJDBCSink())
    
    
        env.execute()
      }
    
    }
    //注意 必须继承富函数类,这样才有生命周期管理函数
    class MyJDBCSink extends  RichSinkFunction[String]{
      //定义SQL 链接 预编译器
      var conn: Connection =_
      var insertSql: PreparedStatement=_
    
      //创建链接和预编译语句
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","000000")
    
        insertSql= conn.prepareStatement("insert into flink(socket) values (?)")
    
      }
    
      override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    
    //    super.invoke(value, context)
        insertSql.setString(1,value)
        insertSql.execute()
      }
    
      override def close(): Unit = {
    //    super.close()
        insertSql.close()
        conn.close()
      }
    
    }

    测试 :socket输入输入数据

     MySql 接受数据:

  • 相关阅读:
    Windows PowerShell 2.0之进程管理
    PowerShell 2.0远程管理之交互式远程线程
    PowerShell 2.0解析、格式化及显示远程输出
    PowerShell 2.0语言远程管理之理解线程配置
    PowerShell 2.0远程管理之隐式远程管理
    PowerShell 2.0如何将远程线程保存在本地
    Windows PowerShell 2.0之服务管理
    PowerShell 2.0远程管理开发使用CredSSP处理多跳授权
    通过PowerShell操作事件日志
    (译)Silverlight教程第七部分: 使用控件模板定制控件的观感
  • 原文地址:https://www.cnblogs.com/kpwong/p/14092893.html
Copyright © 2011-2022 走看看