zoukankan      html  css  js  c++  java
  • 项目实战 从 0 到 1 学习之Flink(25)Flink从redis中获取数据作为source源

    redis中的数据:
    在这里插入图片描述
    需要实现SourceFunction接口,指定泛型<>,也就是获取redis里的数据,处理完后的数据输入的数据类型 这里我们需要的是
    (我们需要返回kv对的,就要考虑HashMap)
    pom.xml

     <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
                <dependency>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                    <version>2.9.3</version>
                </dependency>

    Java代码:

    package ryx.source;
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.exceptions.JedisConnectionException;
    
    import javax.swing.plaf.TableHeaderUI;
    import java.util.HashMap;
    import java.util.Map;
    
    
    /**
     *
     * 在redis中保存的有国家和大区的关系
     * hset  areas AREA_US US
     * hset  areas AREA_CT TW,HK
     * hset  areas AREA_AR PK,KW,SA
     * hset  areas AREA_IN IN
     *./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic allDataClean--from-beginning
     *
     * 我们需要返回kv对的,就要考虑HashMap
     */
    public class MyRedisSource implements SourceFunction<HashMap<String,String>> {
        private Logger logger= LoggerFactory.getLogger(MyRedisSource.class);
        private boolean isRunning =true;
        private Jedis jedis=null;
        private final long SLEEP_MILLION=5000;
        public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
            this.jedis = new Jedis("hadoop01", 6379);
            HashMap<String, String> kVMap = new HashMap<String, String>();
            while(isRunning){
                try{
                    kVMap.clear();
                    Map<String, String> areas = jedis.hgetAll("areas");
                    for(Map.Entry<String,String> entry:areas.entrySet()){
                        // key :大区 value:国家
                        String key = entry.getKey();
                        String value = entry.getValue();
                        String[] splits = value.split(",");
                        System.out.println("key:"+key+",--value:"+value);
                        for (String split:splits){
                            // key :国家value:大区
                            kVMap.put(split, key);
                        }
                    }
                    if(kVMap.size()>0){
                        ctx.collect(kVMap);
                    }else {
                        logger.warn("从redis中获取的数据为空");
                    }
                    Thread.sleep(SLEEP_MILLION);
                }catch (JedisConnectionException e){
                    logger.warn("redis连接异常,需要重新连接",e.getCause());
                    jedis = new Jedis("hadoop01", 6379);
                }catch (Exception e){
                    logger.warn(" source 数据源异常",e.getCause());
                }
            }
        }
    
        public void cancel() {
            isRunning=false;
            while(jedis!=null){
                jedis.close();
            }
        }
    }

    结果为:
    key:AREA_US,–value:US
    key:AREA_CT,–value:TW,HK
    key:AREA_AR,–value:PK,KW,SA
    key:AREA_IN,–value:IN

    接着将value数据进行分割单个的单词,和key进行进行组合装到HashMap中,通过Run方法的SourceContext对象,作为Source源进行输出!

    作者:大码王

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

  • 相关阅读:
    数据库02
    MySQL1
    GIL 死锁 递归锁 event 信号量 线程Queue
    小脚本 暴力删除文件 刷屏
    常见web攻击 及基础 回顾(杂记)
    接口中的简单异步 async
    python协程 示例
    python 利用jinja2模板生成html
    python 调用webservices 接口
    python 进程 进程池 进程间通信
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13633151.html
Copyright © 2011-2022 走看看