zoukankan      html  css  js  c++  java
  • Flink从socket读取数据sink到redis

    package com.lin.flink.stream.customPartition;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    public class StreamingDemoToRedis {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> text = env.socketTextStream("node1", 9000, "
    ");
    
            //lpsuh l_words word
    
            //对数据进行组装,把string转化为tuple2<String,String>
            DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(String value) throws Exception {
                    return new Tuple2<String, String>("l_words", value);
                }
            });
    
            //创建redis的配置
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build();
    
            //创建redissink
            RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyRedisMapper());
    
            l_wordsData.addSink(redisSink);
    
            env.execute("StreamingDemoToRedis");
        }
    
        public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
            //表示从接收的数据中获取需要操作的redis key
            @Override
            public String getKeyFromData(Tuple2<String, String> data) {
                return data.f0;
            }
            //表示从接收的数据中获取需要操作的redis value
            @Override
            public String getValueFromData(Tuple2<String, String> data) {
                return data.f1;
            }
    
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.LPUSH);
            }
        }
    }
  • 相关阅读:
    Django_jinja2
    css画太极
    python 自己实现map
    python 比赛 组合问题
    python 找素数
    如何快速掌握一门新技术/语言/框架
    jQuery常用事件-思维导图
    jQuery常用函数-思维导图
    jQuery选择器汇总-思维导图
    3.git版本控制-管理修改、撤销、删除
  • 原文地址:https://www.cnblogs.com/linkmust/p/10933536.html
Copyright © 2011-2022 走看看