zoukankan      html  css  js  c++  java
  • 累加器, 广播变量及分布式缓存

    1. 累加器

      • Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。

      • 计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。

      • 注意事项:

        • 需要在算子内部创建累加器对象
        • 通常在Rich函数中的open方法中注册累加器,指定累加器的名称
        • 在当前算子内任意位置可以使用累加器
        • 必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。
      • 代码:

        package com.ronnie.flink.batch;
        
        import org.apache.flink.api.common.JobExecutionResult;
        import org.apache.flink.api.common.accumulators.IntCounter;
        import org.apache.flink.api.common.functions.RichMapFunction;
        import org.apache.flink.api.java.DataSet;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.MapOperator;
        import org.apache.flink.configuration.Configuration;
        import org.apache.flink.core.fs.FileSystem;
        
        public class AccumulatorTest {
            public static void main(String[] args) throws Exception {
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
                // 读取数据源
                DataSet<String> text = env.readTextFile("data/textFile");
        
                MapOperator<String, String> hello = text.map(new RichMapFunction<String, String>() {
                    IntCounter intCounter = new IntCounter();
        
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        getRuntimeContext().addAccumulator("my-accumulator", intCounter);
                    }
        
                    @Override
                    public String map(String value) throws Exception {
                        if (value.contains("hello")) {
                            intCounter.add(1);
                        }
                        return value;
                    }
                });
                hello.writeAsText("data/my.txt", FileSystem.WriteMode.OVERWRITE);
        
                JobExecutionResult counter = env.execute("counter");
        
                Integer result = counter.getAccumulatorResult("my-accumulator");
        
                System.out.println(result);
            }
        }
        
        
    2. 广播变量

      • 广播: 数据集合通过withBroadcastSet进行广播

      • 访问: 可通过getRuntimeContext().getBroadcastVariable访问

      • 代码:

        package com.ronnie.flink.batch;
        
        import org.apache.flink.api.common.functions.RichFilterFunction;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.DataSource;
        import org.apache.flink.api.java.operators.FilterOperator;
        import org.apache.flink.configuration.Configuration;
        
        import java.util.ArrayList;
        import java.util.List;
        
        /**
         * 广播变量在某个算子后面调用.withBroadcastSet(whiteDs, "white-name") 方法;
         * 即此operator可以得到广播变量,其他算子不行。
         */
        public class BroadCastTest {
            public static void main(String[] args) throws Exception {
        
                ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        
                DataSource<String> textFile = environment.readTextFile("data/textfile");
        
                List<String> list = new ArrayList<String>();
        
                list.add("rust");
                list.add("swift");
        
                DataSource<String> whiteDs = environment.fromCollection(list);
        
                FilterOperator<String> f1 = textFile.filter(new RichFilterFunction<String>() {
        
                    List<String> whileNames = null;
        
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        whileNames = getRuntimeContext().getBroadcastVariable("white-name");
                    }
        
                    @Override
                    public boolean filter(String value) throws Exception {
                        for (String whileName : whileNames) {
                            if (value.contains(whileName)) {
                                return true;
                            }
                        }
                        return false;
                    }
                });
        
                // f1 operator 算子可以得到广播变量。
                FilterOperator<String> f2 = f1.withBroadcastSet(whiteDs, "while-name");
        
                f2.print();
            }
        }
        
        
    3. 分布式缓存

      • Flink提供了一个分布式缓存,类似于Apache Hadoop。

      • 执行程序时,Flink会自动将文件或目录复制到所有Worker的本地文件系统。

      • 用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。

      • 代码:

        package com.ronnie.flink.batch;
        
        import org.apache.commons.io.FileUtils;
        import org.apache.flink.api.common.functions.RichMapFunction;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.DataSource;
        import org.apache.flink.api.java.operators.MapOperator;
        import org.apache.flink.configuration.Configuration;
        
        import java.io.File;
        import java.util.List;
        
        public class DistributedCacheTest {
            public static void main(String[] args) throws Exception {
                ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        
                // 当前项目路径
                String project_path = System.getProperty("user.dir");
        
                // 可以是本地文件或hdfs文件, hdfs 文件路径则以hdfs://开头
                environment.registerCachedFile("file:///" + project_path + "/data/textfile", "myfile");
        
                DataSource<String> elements = environment.fromElements("hadoop", "flink", "spark", "hbase");
        
                MapOperator<String, String> map = elements.map(new RichMapFunction<String, String>() {
        
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 在worker端获取缓存文件
                        File myFile = getRuntimeContext().getDistributedCache().getFile("myFile");
                        // 读取缓存文件
                        List<String> list = FileUtils.readLines(myFile);
                        for (String line : list) {
                            System.out.println("[" + line + "]");
                        }
                    }
        
                    @Override
                    public String map(String value) throws Exception {
        
                        return value;
                    }
                });
                map.print();
            }
        
        }
        
        
  • 相关阅读:
    Pycharm的使用秘籍
    python 装饰器
    python继承和重写
    python中一切皆对象及__init__.py文件的使用
    python的参数类型与时序解包
    Jmeter使用计数器生成变量
    python unittest套件加载用例时,出现No tests were found,Empty test suite
    Python 执行tail文件并操作
    go 函数和流程控制
    go 数据类型和操作符
  • 原文地址:https://www.cnblogs.com/ronnieyuan/p/11849564.html
Copyright © 2011-2022 走看看