zoukankan      html  css  js  c++  java
  • Flink消费Kafka数据并把实时计算的结果导入到Redis

    1. 完成的场景

    在很多大数据场景下,要求数据形成数据流的形式进行计算和存储。上篇博客介绍了Flink消费Kafka数据实现Wordcount计算,这篇博客需要完成的是将实时计算的结果写到redis。当kafka从其他端获取数据立刻到Flink计算,Flink计算完后结果写到Redis,整个过程就像流水一样形成了数据流的处理

    2. 代码

    添加第三方依赖

        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>1.4.0</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.4.0</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.4.0</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
                <version>1.4.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_2.10</artifactId>
                <version>1.1.5</version>
            </dependency>
    
        </dependencies>

    注意这里的版本最好统一选1.4.0,flink-redis的版本最好选1.1.5,用低版本或其他版本会遇到包冲突或者不同包的同一类不同等逻辑或者第版本有些类没有等java通用的一些问题

    逻辑代码

    package com.scn;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.util.Collector;
    
    import java.util.Properties;
    
    public class FilnkCostKafka {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(1000);
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
            properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
            properties.setProperty("group.id", "test");
    
            FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties);
    
            DataStream<String> stream = env.addSource(myConsumer);
            DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
    
            //实例化Flink和Redis关联类FlinkJedisPoolConfig,设置Redis端口
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
            //实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis
            counts.addSink(new RedisSink<Tuple2<String, Integer>>(conf,new RedisExampleMapper()));
            env.execute("WordCount from Kafka data");
        }
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            private static final long serialVersionUID = 1L;
    
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                String[] tokens = value.toLowerCase().split("\W+");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    
        //指定Redis key并将flink数据类型映射到Redis数据类型
        public static final class RedisExampleMapper implements RedisMapper<Tuple2<String,Integer>>{
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET, "flink");
            }
    
            public String getKeyFromData(Tuple2<String, Integer> data) {
                return data.f0;
            }
    
            public String getValueFromData(Tuple2<String, Integer> data) {
                return data.f1.toString();
            }
        }
    }
    

    编写一个测试类

    package com.scn;
    
    import redis.clients.jedis.Jedis;
    
    public class RedisTest {
        public static void main(String args[]){
            Jedis jedis=new Jedis("127.0.0.1");
            System.out.println("Server is running: " + jedis.ping());
            System.out.println("result:"+jedis.hgetAll("flink"));
        }
    }
    

    3. 测试

    启动Redis服务

    redis-server

    执行FilnkCostKafka main方法

    没有跑出异常信息证明启动没有问题

    在kafka producer端输出一些数据

    执行测试类RedisTest的main方法

    会输出:

    Server is running: PONG
    result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2}

    可以看到数据已经流到Redis

  • 相关阅读:
    4、路由事件 RoutedEvent
    3、Grid、GridSplitter 网格分离器、SharedSizeGroup 共享尺寸组
    2、DockPanel
    1、布局容器Grid、StackPanel、GroupBox、DockPanel、WrapPanel
    15、Qt 样式表
    14、SpinBox与Horizontal Scroll Bar
    13、Qt界面布局
    12、label控件
    11、LineEdit与setCompleter自动补全
    Linux设备模型 (2)
  • 原文地址:https://www.cnblogs.com/ExMan/p/11285146.html
Copyright © 2011-2022 走看看