zoukankan      html  css  js  c++  java
  • flink jdbc方式下沉到hive

    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    import org.apache.flink.streaming.api.scala._
    
    object HiveJdbcSink {
      def main(args: Array[String]): Unit = {
        //创建环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //读取数据
        val inputPath="D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
        val inputStream = env.readTextFile(inputPath)
    
        //简单转换
        val dataStream = inputStream.map(data => {
          var arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(1).toDouble)
        })
    
        dataStream.addSink(new MyJdbcHiveSinkFunc())
    
        env.execute("JdbcHive sink test")
      }
    
    }
    
    
    class MyJdbcHiveSinkFunc() extends RichSinkFunction[SensorReding]{
      //定义连接、预编译语句
    
      var conn:Connection =_
      var insetstmt:PreparedStatement =_
    
      override def open(parameters: Configuration): Unit = {
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        conn=DriverManager.getConnection("jdbc:hive2://192.168.10.20:10000/hive","root","")
        insetstmt = conn.prepareStatement("insert into tb_tmp_001(id,x) values(?,?)")
      }
    
      override def invoke(value: SensorReding): Unit = {
        //插入数据
        insetstmt.setString(1,value.id)
        insetstmt.setString(2,value.temperature.toString)
        insetstmt.execute()
      }
      override def close(): Unit = {
        insetstmt.close()
        conn.close()
      }
    }

    需引入依赖

     <!--start   hive相关依赖-->
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>3.1.2</version>
            </dependency>
            <!--end   hive相关依赖-->
    

      

    author@nohert
  • 相关阅读:
    爬虫大作业
    数据结构化与保存
    使用正则表达式,取得点击次数,函数抽离
    爬取校园新闻首页的新闻
    网络爬虫基础练习
    综合练习:词频统计
    Hadoop综合大作业
    理解MapReduce
    熟悉HBase基本操作
    熟悉常用的HBase操作
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928275.html
Copyright © 2011-2022 走看看