zoukankan      html  css  js  c++  java
  • flink连接器-流处理-读写redis

    写入redis

    resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));
    
    

    getRedisSinkConfig

     public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){
    
    
            String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS);
            Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(",")));
            FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder()
                    .setSentinels(hosts)
                    .setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER))
                    .setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD))
                    .setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL))
                    .setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL))
                    .setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build();
                return redisProduceConfig;
    
        }
    

    MyRedisMapper

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    /**
     * @Auther WeiJiQian
     * @描述 Redis 存储的key和value
     */
    public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
    
        /**
         * 设置使用的redis数据结构类型,和key的名词
         * 通过RedisCommand设置数据结构类型
         * Returns descriptor which defines data type.
         *
         * @return data type descriptor
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL);
        }
    
        /**
         * 设置value中的键值对 key的值
         * Extracts key from data.
         *
         * @return key
         */
        @Override
        public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
            return stringStringTuple2.f0;
        }
    
        /**
         * 设置value中的键值对 value的值
         * Extracts value from data.
         *
         * @return value
         */
        @Override
        public String getValueFromData(Tuple2<String, String> tuple2) {
            return tuple2.f1;
        }
    
    
    }
    
    
  • 相关阅读:
    大型网站前端使用图片格式的正确姿势
    移动端开发技术文档
    超详细的Web前端开发规范文档
    try 、catch 、finally 、throw 测试js错误
    ajax大并发问题
    jQuery之Ajax--全局Ajax事件处理器
    如何处理ajax中嵌套一个ajax
    关于for循环里面异步操作的问题
    XMLHttpRequest: 网络错误 0x2f78,…00002f78
    【转载】OGRE中用到的设计模式
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034971.html
Copyright © 2011-2022 走看看