zoukankan      html  css  js  c++  java
  • Flink--基于mysql的sink和source

    基于mysql的source操作

    object MysqlSource {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val source: DataStream[Student] = env.addSource(new SQL_source)
        source.print()
        env.execute()
      }
    }
    class SQL_source extends RichSourceFunction[Student]{
      private var connection: Connection = null
      private var ps: PreparedStatement = null
    
      override def open(parameters: Configuration): Unit = {
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://hadoop01:3306/test"
        val username = "root"
        val password = "root"
        Class.forName(driver)
        connection = DriverManager.getConnection(url, username, password)
        val sql = "select stuid , stuname , stuaddr , stusex from Student"
        ps = connection.prepareStatement(sql)
      }
    
      override def close(): Unit = {
        if(connection != null){
          connection.close()
        }
        if(ps != null){
          ps.close()
        }
      }
    
    
      override def run(sourceContext: SourceContext[Student]): Unit = {
        val queryRequest = ps.executeQuery()
        while (queryRequest.next()){
          val stuid = queryRequest.getInt("stuid")
          val stuname = queryRequest.getString("stuname")
          val stuaddr = queryRequest.getString("stuaddr")
          val stusex = queryRequest.getString("stusex")
          val stu = new Student(stuid , stuname , stuaddr , stusex)
          sourceContext.collect(stu)
        }
      }
      override def cancel(): Unit = {}
    }
    
    
    case class Student(stuid:Int , stuname:String , stuaddr:String , stusex:String){
      override def toString: String = {
        "stuid:"+stuid+"  stuname:"+stuname+"   stuaddr:"+stuaddr+"   stusex:"+stusex
      }
    }

    基于mysql的sink操作

    object MysqlSink {
      def main(args: Array[String]): Unit = {
        //1.创建流执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //2.准备数据
        val dataStream:DataStream[Student] = env.fromElements(
          Student(8, "xiaoming", "beijing biejing", "female")
    //      Student(6, "daming", "tainjing tianjin", "male "),
    //      Student(7, "daqiang ", "shanghai shanghai", "female")
        )
    
        //3.将数据写入到自定义的sink中(这里是mysql)
        dataStream.addSink(new StudentSinkToMysql)
        //4.触发流执行
        env.execute()
      }
    }
    
    class StudentSinkToMysql extends RichSinkFunction[Student]{
      private var connection:Connection = null
      private var ps:PreparedStatement = null
    
      override def open(parameters: Configuration): Unit = {
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://hadoop01:3306/test"
        val username = "root"
        val password = "root"
        //1:加载驱动
        Class.forName(driver)
        //2:创建连接
        connection = DriverManager.getConnection(url , username , password)
        val sql = "insert into Student(stuid , stuname , stuaddr , stusex) values(?,?,?,?);"
        //3:获得执行语句
        ps = connection.prepareStatement(sql)
      }
    
      //关闭连接操作
      override def close(): Unit = {
        if(connection != null){
          connection.close()
        }
        if(ps != null){
          ps.close()
        }
      }
      //每个元素的插入,都要触发一次invoke,这里主要进行invoke插入
      override def invoke(stu: Student): Unit = {
        try{
          //4.组装数据,执行插入操作
          ps.setInt(1, stu.stuid)
          ps.setString(2, stu.stuname)
          ps.setString(3, stu.stuaddr)
          ps.setString(4, stu.stusex)
          ps.executeUpdate()
        }catch {
          case e:Exception => println(e.getMessage)
        }
      }
    }
  • 相关阅读:
    字符串函数---atof()函数具体解释及实现(完整版)
    curl的简单使用
    [7] 算法之路
    springMVC3.0(文件上传,@RequestMapping加參数,@SessionAttributes,@ModelAttribute,转发,重定向,数值获取,传參,ajax,拦截器)
    hdu 1754 I Hate It 线段树 点改动
    经典的7种排序算法 原理C++实现
    自己定义View实现水平滚动控件
    centos编译ffmpeg x264
    工作脚本处理文本
    A*寻路算法
  • 原文地址:https://www.cnblogs.com/niutao/p/10548620.html
Copyright © 2011-2022 走看看