zoukankan      html  css  js  c++  java
  • Flink学习(十) Sink到Redis

    添加依赖

            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>

    编写代码

    package com.wyh.streamingApi.sink
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
    
    object Sink2Redis {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        val conf = new FlinkJedisPoolConfig.Builder()
            .setHost("localhost")
            .setPort(6379)
            .build()
    
    
    
        //Sink操作
        dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper()))
    
    
        env.execute("redis sink test")
      }
    
    }
    
    class MyRedisMapper() extends RedisMapper[SensorReading]{
    
      //定义保存数据到Redis的命令
      override def getCommandDescription: RedisCommandDescription = {
        //把传感器id和温度值保存成 Hash表 HSET key field value
        new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
      }
    
      //定义保存到redis的key
      override def getKeyFromData(t: SensorReading): String = {
        t.id
      }
    
      //定义保存到redis的value
      override def getValueFromData(t: SensorReading): String = {
        t.temperature.toString
      }
    }

  • 相关阅读:
    tomcat-01-配置文件组成
    Mysql的批量导入
    Mysql优化
    没有处理的问题
    Java 实现一个带提醒的定时器
    Java 端口扫描器 TCP的实现方法
    Java 多态方法构造器执行方法
    JavaWeb 基于Session的用户登陆注销实现
    Java 访问权限控制 小结
    Java FTP下载文件以及编码问题小结
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12920415.html
Copyright © 2011-2022 走看看