zoukankan      html  css  js  c++  java
  • Flink学习(十) Sink到Redis

    添加依赖

            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>

    编写代码

    package com.wyh.streamingApi.sink
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
    
    object Sink2Redis {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        val conf = new FlinkJedisPoolConfig.Builder()
            .setHost("localhost")
            .setPort(6379)
            .build()
    
    
    
        //Sink操作
        dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper()))
    
    
        env.execute("redis sink test")
      }
    
    }
    
    class MyRedisMapper() extends RedisMapper[SensorReading]{
    
      //定义保存数据到Redis的命令
      override def getCommandDescription: RedisCommandDescription = {
        //把传感器id和温度值保存成 Hash表 HSET key field value
        new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
      }
    
      //定义保存到redis的key
      override def getKeyFromData(t: SensorReading): String = {
        t.id
      }
    
      //定义保存到redis的value
      override def getValueFromData(t: SensorReading): String = {
        t.temperature.toString
      }
    }

  • 相关阅读:
    深入分析Redis的主从复制机制
    Arctan的快速近似算法
    德布鲁因序列与indexing 1
    损失函数是学习的指挥棒—记一次实践经历
    二叉树的遍历回顾
    从卷积拆分和分组的角度看CNN模型的演化
    Inception系列回顾
    通俗易懂DenseNet
    ResNet详解与分析
    理解numpy中ndarray的内存布局和设计哲学
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12920415.html
Copyright © 2011-2022 走看看