zoukankan      html  css  js  c++  java
  • Flink读写Redis(一)-写入Redis

    项目pom文件

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.jike.flink</groupId>
        <artifactId>flink-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <flink.version>1.10.0</flink.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- flink 11中需要手动添加
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>1.11.2</version>
            </dependency>
            -->
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.1.5</version>
                <scope>system</scope>
                <systemPath>${basedir}/lib/flink-connector-redis_2.11-1.1.5.jar</systemPath>
            </dependency>
    
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.8.0</version>
                <scope>compile</scope>
            </dependency>
    
        </dependencies>
    
    </project>
    
    

    实现flink写入redis

    实现wordcount功能,并将结果实时写入redis,这里使用了第三方依赖flink-connector-redis_2.11,该依赖提供了RedisSink可以直接使用,具体代码如下:

    代码

    首先定义数据源处理实现类LineSplitter,该类将一行数据分词,输出<单词,1>元祖

    package com.jike.flink.examples.redis;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] tokens = s.toLowerCase().split("\W+");
            for(String token : tokens){
                if(token.length() > 0){
                    collector.collect(new Tuple2<String,Integer>(token,1));
                }
            }
        }
    }
    
    

    然后定义数据写入Redis的配置类,这里面将统计后的所有信息词频写入一个哈希表,哈希表的key为"flink",作为测试使用,哈希表中每个元素key为单词,value为词频

    package com.jike.flink.examples.redis;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    public class SinkRedisMapper implements RedisMapper<Tuple2<String,Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            //hset
            return new RedisCommandDescription(RedisCommand.HSET,"flink");
        }
    
        @Override
        public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
            return stringIntegerTuple2.f0;
        }
    
        @Override
        public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
            return stringIntegerTuple2.f1.toString();
        }
    }
    
    

    最后编写主程序类,该类中使用了socketTextStream数据源,通过前面定义LineSplitter完成解析,然后根据单词进行分组统计,最后写入redis

    
    package com.jike.flink.examples.redis;
    
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    
    public class Sink2Redis {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("实际IP",12345);
            DataStream<Tuple2<String,Integer>> counts = dataStreamSource.flatMap(new LineSplitter()).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    return stringIntegerTuple2.f0;
                }
            }).sum(1);
             //控制台打印
            counts.print().setParallelism(1);
            //定义redis服务器信息
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("redis服务器ip").setPort(redis服务端口).setPassword("redis服务密码").build();
            counts.addSink(new RedisSink<>(conf,new SinkRedisMapper()));
            executionEnvironment.execute();
        }
    }
    
    

    运行效果

    通过nc -l 12345,命令模拟数据源,并输入一些数据

    IDEA中查看打印记录

    查看redis

    可以发现数据已写入redis

    总结

    flink-connector-redis_2.11中提供了RedisSink类,该类实现了RichSinkFunction,可以直接使用,如果有特殊需求,可以自定义Sink类,继承RichSinkFunction,实现特殊处理。flink-connector-redis_2.11的源码比较简洁,下一篇打算分析学习下。

  • 相关阅读:
    KVM---利用 libvirt+qemu-kvm 创建虚拟机
    docker---安装docker
    Ubuntu---VIM 常用命令
    Ubuntu--- 安装VMware 报错 Build enviroment error!
    Ubuntu---不能打开 exfat 文件系统格式的 U盘解决方法
    Ubuntu---gedit 打开windows 下 .txt 文件乱码的解决方法
    MCS-51单片机的串行口及串行通信技术
    MCS-51单片机的定时器/计数器
    MCS-51单片机的中断系统
    计算机网络——网络层
  • 原文地址:https://www.cnblogs.com/darange/p/13881460.html
Copyright © 2011-2022 走看看