添加依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
编写代码
package com.wyh.streamingApi.sink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object Sink2Redis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Source操作
val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
//Transform操作
val dataStream: DataStream[SensorReading] = inputStream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
//Sink操作
dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper()))
env.execute("redis sink test")
}
}
class MyRedisMapper() extends RedisMapper[SensorReading]{
//定义保存数据到Redis的命令
override def getCommandDescription: RedisCommandDescription = {
//把传感器id和温度值保存成 Hash表 HSET key field value
new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
}
//定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = {
t.id
}
//定义保存到redis的value
override def getValueFromData(t: SensorReading): String = {
t.temperature.toString
}
}