zoukankan      html  css  js  c++  java
  • Flink从socket读取数据sink到redis

    package com.lin.flink.stream.customPartition;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    public class StreamingDemoToRedis {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> text = env.socketTextStream("node1", 9000, "
    ");
    
            //lpsuh l_words word
    
            //对数据进行组装,把string转化为tuple2<String,String>
            DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(String value) throws Exception {
                    return new Tuple2<String, String>("l_words", value);
                }
            });
    
            //创建redis的配置
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build();
    
            //创建redissink
            RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyRedisMapper());
    
            l_wordsData.addSink(redisSink);
    
            env.execute("StreamingDemoToRedis");
        }
    
        public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
            //表示从接收的数据中获取需要操作的redis key
            @Override
            public String getKeyFromData(Tuple2<String, String> data) {
                return data.f0;
            }
            //表示从接收的数据中获取需要操作的redis value
            @Override
            public String getValueFromData(Tuple2<String, String> data) {
                return data.f1;
            }
    
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.LPUSH);
            }
        }
    }
  • 相关阅读:
    div里面的内容超出自身高度时,显示省略号
    CSS文本超出2行就隐藏并且显示省略号
    CSS中可以和不可以继承的属性
    return false
    CSS position: absolute、relative定位问题详解
    逆FizzBuzz问题求最短序列
    HTTP协议篇(一):多路复用、数据流
    PHP正则式PCRE
    Docker笔记三:基于LVS DR模式构建WEB服务集群
    架构设计之防止或缓解雪崩效应
  • 原文地址:https://www.cnblogs.com/linkmust/p/10933536.html
Copyright © 2011-2022 走看看