1 工程目录
pom.xml
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency>
config.properties
# Kafka配置 kafka.broker.list=192.168.1.122:9092,192.168.1.133:9092,192.168.1.144:9092 # Redis配置 redis.host=192.168.1.122 redis.port=6379
2 flink 读取redis
RedisUtil(单点/scala读写redis)
package com.atguigu.flink.utils import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object RedisUtil { var jedisPool:JedisPool=null def main(args: Array[String]): Unit = { val jedis_conn = getJedisClient jedis_conn.set("hello","world") closeJedisClient } def getJedisClient:Jedis = { if(jedisPool == null){ // println("开辟一个连接池") val config = PropertiesUtil.load("config.properties") val host = config.getProperty("redis.host") val port = config.getProperty("redis.port") val jedisPoolConfig = new JedisPoolConfig() jedisPoolConfig.setMaxTotal(10) //最大连接数 jedisPoolConfig.setMaxIdle(4) //最大空闲 jedisPoolConfig.setMinIdle(4) //最小空闲 jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待 jedisPoolConfig.setMaxWaitMillis(5000)//忙碌时等待时长 毫秒 jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试 jedisPool=new JedisPool(jedisPoolConfig,host,port.toInt) } jedisPool.getResource } def closeJedisClient = { print("--------关闭redis连接--------") jedisPool.close() } }
PropertiesUtil
package com.atguigu.flink.utils import java.io.InputStreamReader import java.util.Properties object PropertiesUtil { def main(args: Array[String]): Unit = { val properties: Properties = PropertiesUtil.load("config.properties") println(properties.getProperty("kafka.broker.list")) } def load(propertieName:String) : Properties ={ val prop=new Properties() prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName) , "UTF-8")) prop } }
RedisSource
package com.atguigu.flink.source import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.utils.RedisUtil import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import redis.clients.jedis.Jedis class RedisSource extends RichSourceFunction[SensorReading]{ var jedis_conn:Jedis = null override def open(parameters: Configuration): Unit ={ jedis_conn = RedisUtil.getJedisClient } override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { val elem_keys = jedis_conn.hkeys("jedis_test") val keys_iter =elem_keys.iterator() while(keys_iter.hasNext){ var key = keys_iter.next() var value = jedis_conn.hget("jedis_test",key) var curr_time = value.split(",")(0).toLong var tempreture = value.split(",")(1).toDouble sourceContext.collect(SensorReading(key,curr_time,tempreture)) } } override def cancel(): Unit = { //jedis_conn.close() print("close redis") } }
主程序 RedisSourceSinkApp
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.sink.MyRedisSink import com.atguigu.flink.source.RedisSource import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig object RedisSourceSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new RedisSource) //redis连接参数设置 val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.1.122").build() // 调用addSink以此来作为数据输出端 stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink)) // 打印流 stream.print() // 执行主程序 env.execute() } }
3 flink 写入 redis
MyRedisSink
package com.atguigu.flink.sink import com.atguigu.flink.bean.SensorReading import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} class MyRedisSink extends RedisMapper[SensorReading]{ // 定义键的取值 override def getKeyFromData(t: SensorReading): String = t.id // 定义值的取值 override def getValueFromData(t: SensorReading): String = t.timepreture.toString // 定义redis操作 override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"sensor") }
主程序 RedisSourceSinkApp
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.sink.MyRedisSink import com.atguigu.flink.source.RedisSource import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig object RedisSourceSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new RedisSource) //redis连接参数设置 val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.1.122").build() // 调用addSink以此来作为数据输出端 stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink)) // 打印流 stream.print() // 执行主程序 env.execute() } }