zoukankan      html  css  js  c++  java
  • flink连接器-流处理-读写redis

    写入redis

    resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));
    
    

    getRedisSinkConfig

     public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){
    
    
            String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS);
            Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(",")));
            FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder()
                    .setSentinels(hosts)
                    .setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER))
                    .setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD))
                    .setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL))
                    .setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL))
                    .setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build();
                return redisProduceConfig;
    
        }
    

    MyRedisMapper

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    /**
     * @Auther WeiJiQian
     * @描述 Redis 存储的key和value
     */
    public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
    
        /**
         * 设置使用的redis数据结构类型,和key的名词
         * 通过RedisCommand设置数据结构类型
         * Returns descriptor which defines data type.
         *
         * @return data type descriptor
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL);
        }
    
        /**
         * 设置value中的键值对 key的值
         * Extracts key from data.
         *
         * @return key
         */
        @Override
        public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
            return stringStringTuple2.f0;
        }
    
        /**
         * 设置value中的键值对 value的值
         * Extracts value from data.
         *
         * @return value
         */
        @Override
        public String getValueFromData(Tuple2<String, String> tuple2) {
            return tuple2.f1;
        }
    
    
    }
    
    
  • 相关阅读:
    [Android 4.4.4] 泛泰A850 三版通刷 Mokee4.4.4 KTU84P 20140626 RC2.2 by syhost
    YUV12(420) (from)to RGB24
    Python图像处理(16):图像金字塔
    内存管理笔记(分页,分段,逻辑地址,物理地址)【转】
    Linux内核分析--内核中的数据结构双向链表【转】
    标准IO与文件IO 的区别【转】
    Linux中设备号及设备文件【转】
    静态编译和动态编译的区别【转】
    嵌入式系统 Boot Loader 技术内幕【转】
    理解 Linux 的硬链接与软链接【转】
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034971.html
Copyright © 2011-2022 走看看