zoukankan      html  css  js  c++  java
  • Flink学习(十) Sink到Redis

    添加依赖

            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>

    编写代码

    package com.wyh.streamingApi.sink
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
    
    object Sink2Redis {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        val conf = new FlinkJedisPoolConfig.Builder()
            .setHost("localhost")
            .setPort(6379)
            .build()
    
    
    
        //Sink操作
        dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper()))
    
    
        env.execute("redis sink test")
      }
    
    }
    
    class MyRedisMapper() extends RedisMapper[SensorReading]{
    
      //定义保存数据到Redis的命令
      override def getCommandDescription: RedisCommandDescription = {
        //把传感器id和温度值保存成 Hash表 HSET key field value
        new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
      }
    
      //定义保存到redis的key
      override def getKeyFromData(t: SensorReading): String = {
        t.id
      }
    
      //定义保存到redis的value
      override def getValueFromData(t: SensorReading): String = {
        t.temperature.toString
      }
    }

  • 相关阅读:
    快速排序就这么简单
    Shiro入门这篇就够了【Shiro的基础知识、回顾URL拦截】
    SpringDataJPA入门就这么简单
    递归就这么简单
    SpringBoot就是这么简单
    Activiti就是这么简单
    Lucene就是这么简单
    过来人告诉你,去工作前最好还是学学Git
    给女朋友讲解什么是Git
    我终于看懂了HBase,太不容易了...
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12920415.html
Copyright © 2011-2022 走看看