zoukankan      html  css  js  c++  java
  • Flink 三种Sink redis,es和jdbc

    一、redis sink

    对应jar包

    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>

    将文件内容写入到hash中

    代码:

    object RedisSinkTest {
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")
    
        val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
    
        //redis sink
        val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build()
        dataStream.addSink(new RedisSink(config,new MyRedisMapper))
    
        env.execute("redis sink test")
      }
    
    }
    
    class MyRedisMapper extends RedisMapper[SensorReading]{
    
      //命令为hset,键为sensor_temperature
      override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
      }
    
      //field为传感器id
      override def getKeyFromData(t: SensorReading): String = t.id
    
      //value为温度
      override def getValueFromData(t: SensorReading): String = t.temperature.toString
    }
    

    redis查看结果

    127.0.0.1:6379> hgetall sensor_temperature
    1) "sensor_1"
    2) "35.80018327300259"
    3) "sensor_6"
    4) "15.402984393403084"
    5) "sensor_10"
    6) "38.101067604893444"
    7) "sensor_7"
    8) "6.720945201171228"
    

      

    二、es sink

    对应jar包

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>

    将文件内容写入到es中

    代码:

    object EsSinkTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")
    
        val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
    
        //es sink
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("hadoop101",9200))
        //创建一个es sink的builder
        val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
          override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
            println("保存数据:" + t)
            //包装成map
            val map = new util.HashMap[String, String]()
            map.put("sensor_id", t.id)
            map.put("temperature", t.temperature.toString)
            map.put("ts", t.timestamp.toString)
    
            //创建index request,准备发送数据
            val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map)
    
            //利用requestIndexer发送请求,写入数据
            requestIndexer.add(indexRequest)
    
            println("保存成功")
          }
        })
        esSinkBuilder
    
        dataStream.addSink(esSinkBuilder.build())
    
        env.execute("redis sink test")
      }
    
    }
    

    es中查看结果

    {
      "took" : 148,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 6,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "sensor",
            "_type" : "redingdata",
            "_id" : "QXpZnnEBUwLRQchmepbT",
            "_score" : 1.0,
            "_source" : {
              "sensor_id" : "sensor_6",
              "temperature" : "15.402984393403084",
              "ts" : "1547718201"
            }
          },
          {
            "_index" : "sensor",
            "_type" : "redingdata",
            "_id" : "RnpZnnEBUwLRQchme5ZS",
            "_score" : 1.0,
            "_source" : {
              "sensor_id" : "sensor_7",
              "temperature" : "6.720945201171228",
              "ts" : "1547718202"
            }
          },
          {
            "_index" : "sensor",
            "_type" : "redingdata",
            "_id" : "Q3pZnnEBUwLRQchmepbr",
            "_score" : 1.0,
            "_source" : {
              "sensor_id" : "sensor_1",
              "temperature" : "35.80018327300259",
              "ts" : "1547718199"
            }
          },
          {
            "_index" : "sensor",
            "_type" : "redingdata",
            "_id" : "QnpZnnEBUwLRQchmepbo",
            "_score" : 1.0,
            "_source" : {
              "sensor_id" : "sensor_1",
              "temperature" : "30.8",
              "ts" : "1547718200"
            }
          },
          {
            "_index" : "sensor",
            "_type" : "redingdata",
            "_id" : "RHpZnnEBUwLRQchmepbs",
            "_score" : 1.0,
            "_source" : {
              "sensor_id" : "sensor_1",
              "temperature" : "40.8",
              "ts" : "1547718201"
            }
          },
          {
            "_index" : "sensor",
            "_type" : "redingdata",
            "_id" : "RXpZnnEBUwLRQchmepbu",
            "_score" : 1.0,
            "_source" : {
              "sensor_id" : "sensor_10",
              "temperature" : "38.101067604893444",
              "ts" : "1547718205"
            }
          }
        ]
      }
    }

     三、jdbc sink

    ①mysql驱动

    <!-- mysql sink -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>

    ②自定义mysql sink,继承RichSinkFunction,重写执行逻辑以及初始化和关闭资源的方法。

    class MyJdbcSink() extends RichSinkFunction[SensorReading]{
    
      //定义sql连接、预编译器
      var conn:Connection = _
      var insertStmt : PreparedStatement = _
      var updateStmt:PreparedStatement=_
    
      //初始化
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
    
        conn = DriverManager.getConnection("jdbc:mysql:///test","root","123456")
    
        insertStmt = conn.prepareStatement("insert into temperatures(sensor,temp) values(?,?)")
    
        updateStmt = conn.prepareStatement("update temperatures set temp=? where sensor=?")
      }
    
      //调用连接,执行sql
      override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
        //这句必须删掉
        //super.invoke(value, context)
    
        //执行更新语句
        updateStmt.setDouble(1,value.temperature)
        updateStmt.setString(2,value.id)
        updateStmt.execute()
    
        //如果没有,则插入
        if (updateStmt.getUpdateCount == 0){
          insertStmt.setString(1,value.id)
          insertStmt.setDouble(2,value.temperature)
          insertStmt.execute()
        }
    
      }
    
      //关闭资源
      override def close(): Unit = {
        updateStmt.close()
        insertStmt.close()
        conn.close()
      }
    
    }
    

    ③添加自定义的mysql sink并执行

    object JdbcSinkTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")
    
        val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
    
        //jdbc sink
        dataStream.addSink(new MyJdbcSink())
    
        env.execute("jdbc sink test")
      }
    
    }
    

      

  • 相关阅读:
    在web项目启动时,使用监听器来执行某个方法
    spring boot --- 初级体验
    Java字符串连接最佳实践
    JPA
    基于Spring AOP的JDK动态代理和CGLIB代理
    jQuery.validate表单校验+bootstrap
    搜索技术---solr
    最常用的缓存技术---redis入门
    内外网同时访问的路由配置
    创建 Visual Studio 2017 离线安装
  • 原文地址:https://www.cnblogs.com/noyouth/p/12749121.html
Copyright © 2011-2022 走看看