zoukankan      html  css  js  c++  java
  • 7-Flink的分布式缓存

    分布式缓存

    Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。 当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。

    示例

    在ExecutionEnvironment中注册一个文件:

    //获取运行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
    env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
    
    复制代码

    在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

    DataSet<String> result = data.map(new RichMapFunction<String, String>() {
                private ArrayList<String> dataList = new ArrayList<String>();
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //2:使用文件
                    File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                    List<String> lines = FileUtils.readLines(myFile);
                    for (String line : lines) {
                        this.dataList.add(line);
                        System.err.println("分布式缓存为:" + line);
                    }
                }
    
                @Override
                public String map(String value) throws Exception {
                    //在这里就可以使用dataList
                    System.err.println("使用datalist:" + dataList + "------------" +value);
                    //业务逻辑
                    return dataList +":" +  value;
                }
            });
    
            result.printToErr();
        }
    复制代码

    完整代码如下,仔细看注释:

    
    public class DisCacheTest {
    
        public static void main(String[] args) throws Exception{
    
            //获取运行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
          //text 中有4个单词:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
    
            DataSource<String> data = env.fromElements("a", "b", "c", "d");
    
            DataSet<String> result = data.map(new RichMapFunction<String, String>() {
                private ArrayList<String> dataList = new ArrayList<String>();
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //2:使用文件
                    File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                    List<String> lines = FileUtils.readLines(myFile);
                    for (String line : lines) {
                        this.dataList.add(line);
                        System.err.println("分布式缓存为:" + line);
                    }
                }
    
                @Override
                public String map(String value) throws Exception {
                    //在这里就可以使用dataList
                    System.err.println("使用datalist:" + dataList + "------------" +value);
                    //业务逻辑
                    return dataList +":" +  value;
                }
            });
    
            result.printToErr();
        }
    }//
    
    复制代码

    输出结果如下:

    [hello, flink, hello, FLINK]:a
    [hello, flink, hello, FLINK]:b
    [hello, flink, hello, FLINK]:c
    [hello, flink, hello, FLINK]:d
    复制代码

    公众号推荐

    • 全网唯一一个从0开始帮助Java开发者转做大数据领域的公众号~
    • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
    • 更多大数据技术欢迎和作者一起探讨~

    作者:王知无
    链接:https://juejin.im/post/5c769927f265da2d905849a0
    来源:掘金
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    mac访达中的位置
    ResponseEntity和@ResponseBody以及@ResponseStatus区别
    Spring Boot中进行Junit测试
    添加数据库时出现time zone无法识别问题
    HTTPS
    表达式求值
    《进击JavaScript核心》学习笔记
    GitLab领取任务+建立分支+本地修改+上传分支+合并分支详细步骤
    黑苹果使用感受和常见问题注意事项!
    JS进阶练习
  • 原文地址:https://www.cnblogs.com/importbigdata/p/10747070.html
Copyright © 2011-2022 走看看