zoukankan      html  css  js  c++  java
  • flink连接器-流处理-读写redis

    写入redis

    resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));
    
    

    getRedisSinkConfig

     public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){
    
    
            String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS);
            Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(",")));
            FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder()
                    .setSentinels(hosts)
                    .setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER))
                    .setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD))
                    .setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL))
                    .setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL))
                    .setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build();
                return redisProduceConfig;
    
        }
    

    MyRedisMapper

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    /**
     * @Auther WeiJiQian
     * @描述 Redis 存储的key和value
     */
    public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
    
        /**
         * 设置使用的redis数据结构类型,和key的名词
         * 通过RedisCommand设置数据结构类型
         * Returns descriptor which defines data type.
         *
         * @return data type descriptor
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL);
        }
    
        /**
         * 设置value中的键值对 key的值
         * Extracts key from data.
         *
         * @return key
         */
        @Override
        public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
            return stringStringTuple2.f0;
        }
    
        /**
         * 设置value中的键值对 value的值
         * Extracts value from data.
         *
         * @return value
         */
        @Override
        public String getValueFromData(Tuple2<String, String> tuple2) {
            return tuple2.f1;
        }
    
    
    }
    
    
  • 相关阅读:
    Codeforces Round #564(div2)
    714
    1471
    UVa 11134
    UVa 1152 -4 Values whose Sum is 0—[哈希表实现]
    UVa 1374
    UVA 1343
    UVa 12325
    Yet Another Number Sequence——[矩阵快速幂]
    River Hopscotch-[二分查找、贪心]
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034971.html
Copyright © 2011-2022 走看看