zoukankan      html  css  js  c++  java
  • Flink 写数据到MySql (JDBC Sink)

    POM 文件

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.44</version>
            </dependency>

    代码:

    package com.kpwong.sink
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    
    object JDBCSinkTest {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
    
        //sink
    
        socketDS.addSink(new MyJDBCSink())
    
    
        env.execute()
      }
    
    }
    //注意 必须继承富函数类,这样才有生命周期管理函数
    class MyJDBCSink extends  RichSinkFunction[String]{
      //定义SQL 链接 预编译器
      var conn: Connection =_
      var insertSql: PreparedStatement=_
    
      //创建链接和预编译语句
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","000000")
    
        insertSql= conn.prepareStatement("insert into flink(socket) values (?)")
    
      }
    
      override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    
    //    super.invoke(value, context)
        insertSql.setString(1,value)
        insertSql.execute()
      }
    
      override def close(): Unit = {
    //    super.close()
        insertSql.close()
        conn.close()
      }
    
    }

    测试 :socket输入输入数据

     MySql 接受数据:

  • 相关阅读:
    PyCharm不能使用Tab键进行整体向左缩进解决方法
    Python代码规范(PEP8)问题及解决
    Python学习开始
    Spring Annotation(@Autowire、@Qualifier)
    Spring自动装配
    servlet验证码
    Spring集合装配
    帐号明文传输漏洞
    java单元测试
    项目building workspace很慢,或者直接内存溢出的问题解决办法。
  • 原文地址:https://www.cnblogs.com/kpwong/p/14092893.html
Copyright © 2011-2022 走看看