zoukankan      html  css  js  c++  java
  • FLINK实例(10):CONNECTORS(9)redis 读写

    1 工程目录

     pom.xml

                <dependency>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                    <version>2.8.1</version>
                </dependency>

    config.properties

    # Kafka配置
    kafka.broker.list=192.168.1.122:9092,192.168.1.133:9092,192.168.1.144:9092
    # Redis配置
    redis.host=192.168.1.122
    redis.port=6379

    2 flink 读取redis

    RedisUtil(单点/scala读写redis)

    package com.atguigu.flink.utils
    
    import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
    
    object RedisUtil {
      var jedisPool:JedisPool=null
    
      def main(args: Array[String]): Unit = {
        val jedis_conn = getJedisClient
        jedis_conn.set("hello","world")
        closeJedisClient
      }
    
      def getJedisClient:Jedis = {
        if(jedisPool == null){
          //      println("开辟一个连接池")
          val config = PropertiesUtil.load("config.properties")
          val host = config.getProperty("redis.host")
          val port = config.getProperty("redis.port")
    
          val jedisPoolConfig = new JedisPoolConfig()
          jedisPoolConfig.setMaxTotal(10)  //最大连接数
          jedisPoolConfig.setMaxIdle(4)   //最大空闲
          jedisPoolConfig.setMinIdle(4)     //最小空闲
          jedisPoolConfig.setBlockWhenExhausted(true)  //忙碌时是否等待
          jedisPoolConfig.setMaxWaitMillis(5000)//忙碌时等待时长 毫秒
          jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
    
          jedisPool=new JedisPool(jedisPoolConfig,host,port.toInt)
        }
    
        jedisPool.getResource
    
      }
    
      def closeJedisClient = {
        print("--------关闭redis连接--------")
        jedisPool.close()
      }
    
    }

    PropertiesUtil

    package com.atguigu.flink.utils
    
    import java.io.InputStreamReader
    import java.util.Properties
    
    object PropertiesUtil {
      def main(args: Array[String]): Unit = {
        val properties: Properties = PropertiesUtil.load("config.properties")
        println(properties.getProperty("kafka.broker.list"))
    
      }
    
      def load(propertieName:String) : Properties ={
        val prop=new Properties()
        prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName) , "UTF-8"))
        prop
      }
    }

    RedisSource

    package com.atguigu.flink.source
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.utils.RedisUtil
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    import redis.clients.jedis.Jedis
    
    class RedisSource extends RichSourceFunction[SensorReading]{
      var jedis_conn:Jedis = null
    
      override def open(parameters: Configuration): Unit ={
        jedis_conn = RedisUtil.getJedisClient
      }
    
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        val elem_keys = jedis_conn.hkeys("jedis_test")
        val keys_iter =elem_keys.iterator()
        while(keys_iter.hasNext){
             var key = keys_iter.next()
             var value = jedis_conn.hget("jedis_test",key)
             var curr_time = value.split(",")(0).toLong
             var tempreture = value.split(",")(1).toDouble
             sourceContext.collect(SensorReading(key,curr_time,tempreture))
        }
      }
    
      override def cancel(): Unit = {
        //jedis_conn.close()
        print("close redis")
      }
    
    }

    主程序 RedisSourceSinkApp

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.sink.MyRedisSink
    import com.atguigu.flink.source.RedisSource
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    
    object RedisSourceSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new RedisSource)
    
        //redis连接参数设置
        val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.1.122").build()
    
        // 调用addSink以此来作为数据输出端
        stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink))
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
      }
    
    }

    3 flink 写入 redis

    MyRedisSink

    package com.atguigu.flink.sink
    
    import com.atguigu.flink.bean.SensorReading
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
    
    class MyRedisSink extends RedisMapper[SensorReading]{
      // 定义键的取值
      override def getKeyFromData(t: SensorReading): String = t.id
      // 定义值的取值
      override def getValueFromData(t: SensorReading): String = t.timepreture.toString
      // 定义redis操作
      override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"sensor")
    
    
    }

    主程序 RedisSourceSinkApp

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.sink.MyRedisSink
    import com.atguigu.flink.source.RedisSource
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    
    object RedisSourceSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new RedisSource)
    
        //redis连接参数设置
        val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.1.122").build()
    
        // 调用addSink以此来作为数据输出端
        stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink))
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
      }
    
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13680652.html

  • 相关阅读:
    分母为0一定会抛异常吗?
    [译]Zookeeper的优点与局限性
    明明有class为什么还是报ClassNotFoundException?
    广告倒排索引架构与优化
    KafkaProducer源码分析
    Kafka服务端之网络连接源码分析
    Sublime常用快捷键
    sublime主题设置
    Sublime前端插件
    安装软件,更新软件,删除软件
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13680652.html
Copyright © 2011-2022 走看看