zoukankan      html  css  js  c++  java
  • Flink输出到Redis

       1.代码

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

    //温度传感器读取样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)

    object RedisSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //source
    val inputStream = env.readTextFile("sensor1.txt")

    //transform
    import org.apache.flink.api.scala._
    val dataStream = inputStream.map(x => {
    val arr = x.split(",")
    SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    //sink
    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()

    dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper))

    env.execute("redis sink test")
    }
    }

    //定义一个redis的mapper类,用于定义保存到redis时调用的命令
    class MyRedisMapper extends RedisMapper[SensorReading] {
    override def getCommandDescription: RedisCommandDescription = {
    //把传感器id和温度值保存成哈希表: HSET key field value
    new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
    }

    //相当于是field
    override def getKeyFromData(data: SensorReading): String = {
    data.id
    }

    override def getValueFromData(data: SensorReading): String = {
    data.temperature.toString
    }

    }

    2.结果
     

    有帮助的欢迎评论打赏哈,谢谢!

  • 相关阅读:
    探索Javascript 异步编程
    前端性能调优
    怎样选择前端框架
    前端地图截屏方案
    不一样的山顶角
    前后端分离场景下的另类登录认证方案
    React Diff 算法
    纯css实现Magicline Navigation(下划线动画导航菜单)
    Tinymce group plugin
    自适应process组件
  • 原文地址:https://www.cnblogs.com/wddqy/p/12175808.html
Copyright © 2011-2022 走看看