zoukankan      html  css  js  c++  java
  • 累加器, 广播变量及分布式缓存

    1. 累加器

      • Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。

      • 计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。

      • 注意事项:

        • 需要在算子内部创建累加器对象
        • 通常在Rich函数中的open方法中注册累加器,指定累加器的名称
        • 在当前算子内任意位置可以使用累加器
        • 必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。
      • 代码:

        package com.ronnie.flink.batch;
        
        import org.apache.flink.api.common.JobExecutionResult;
        import org.apache.flink.api.common.accumulators.IntCounter;
        import org.apache.flink.api.common.functions.RichMapFunction;
        import org.apache.flink.api.java.DataSet;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.MapOperator;
        import org.apache.flink.configuration.Configuration;
        import org.apache.flink.core.fs.FileSystem;
        
        public class AccumulatorTest {
            public static void main(String[] args) throws Exception {
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
                // 读取数据源
                DataSet<String> text = env.readTextFile("data/textFile");
        
                MapOperator<String, String> hello = text.map(new RichMapFunction<String, String>() {
                    IntCounter intCounter = new IntCounter();
        
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        getRuntimeContext().addAccumulator("my-accumulator", intCounter);
                    }
        
                    @Override
                    public String map(String value) throws Exception {
                        if (value.contains("hello")) {
                            intCounter.add(1);
                        }
                        return value;
                    }
                });
                hello.writeAsText("data/my.txt", FileSystem.WriteMode.OVERWRITE);
        
                JobExecutionResult counter = env.execute("counter");
        
                Integer result = counter.getAccumulatorResult("my-accumulator");
        
                System.out.println(result);
            }
        }
        
        
    2. 广播变量

      • 广播: 数据集合通过withBroadcastSet进行广播

      • 访问: 可通过getRuntimeContext().getBroadcastVariable访问

      • 代码:

        package com.ronnie.flink.batch;
        
        import org.apache.flink.api.common.functions.RichFilterFunction;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.DataSource;
        import org.apache.flink.api.java.operators.FilterOperator;
        import org.apache.flink.configuration.Configuration;
        
        import java.util.ArrayList;
        import java.util.List;
        
        /**
         * 广播变量在某个算子后面调用.withBroadcastSet(whiteDs, "white-name") 方法;
         * 即此operator可以得到广播变量,其他算子不行。
         */
        public class BroadCastTest {
            public static void main(String[] args) throws Exception {
        
                ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        
                DataSource<String> textFile = environment.readTextFile("data/textfile");
        
                List<String> list = new ArrayList<String>();
        
                list.add("rust");
                list.add("swift");
        
                DataSource<String> whiteDs = environment.fromCollection(list);
        
                FilterOperator<String> f1 = textFile.filter(new RichFilterFunction<String>() {
        
                    List<String> whileNames = null;
        
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        whileNames = getRuntimeContext().getBroadcastVariable("white-name");
                    }
        
                    @Override
                    public boolean filter(String value) throws Exception {
                        for (String whileName : whileNames) {
                            if (value.contains(whileName)) {
                                return true;
                            }
                        }
                        return false;
                    }
                });
        
                // f1 operator 算子可以得到广播变量。
                FilterOperator<String> f2 = f1.withBroadcastSet(whiteDs, "while-name");
        
                f2.print();
            }
        }
        
        
    3. 分布式缓存

      • Flink提供了一个分布式缓存,类似于Apache Hadoop。

      • 执行程序时,Flink会自动将文件或目录复制到所有Worker的本地文件系统。

      • 用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。

      • 代码:

        package com.ronnie.flink.batch;
        
        import org.apache.commons.io.FileUtils;
        import org.apache.flink.api.common.functions.RichMapFunction;
        import org.apache.flink.api.java.ExecutionEnvironment;
        import org.apache.flink.api.java.operators.DataSource;
        import org.apache.flink.api.java.operators.MapOperator;
        import org.apache.flink.configuration.Configuration;
        
        import java.io.File;
        import java.util.List;
        
        public class DistributedCacheTest {
            public static void main(String[] args) throws Exception {
                ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        
                // 当前项目路径
                String project_path = System.getProperty("user.dir");
        
                // 可以是本地文件或hdfs文件, hdfs 文件路径则以hdfs://开头
                environment.registerCachedFile("file:///" + project_path + "/data/textfile", "myfile");
        
                DataSource<String> elements = environment.fromElements("hadoop", "flink", "spark", "hbase");
        
                MapOperator<String, String> map = elements.map(new RichMapFunction<String, String>() {
        
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 在worker端获取缓存文件
                        File myFile = getRuntimeContext().getDistributedCache().getFile("myFile");
                        // 读取缓存文件
                        List<String> list = FileUtils.readLines(myFile);
                        for (String line : list) {
                            System.out.println("[" + line + "]");
                        }
                    }
        
                    @Override
                    public String map(String value) throws Exception {
        
                        return value;
                    }
                });
                map.print();
            }
        
        }
        
        
  • 相关阅读:
    leetcode231 2的幂 leetcode342 4的幂 leetcode326 3的幂
    leetcode300. Longest Increasing Subsequence 最长递增子序列 、674. Longest Continuous Increasing Subsequence
    leetcode64. Minimum Path Sum
    leetcode 20 括号匹配
    算法题待做
    leetcode 121. Best Time to Buy and Sell Stock 、122.Best Time to Buy and Sell Stock II 、309. Best Time to Buy and Sell Stock with Cooldown 、714. Best Time to Buy and Sell Stock with Transaction Fee
    rand7生成rand10,rand1生成rand6,rand2生成rand5(包含了rand2生成rand3)
    依图
    leetcode 1.Two Sum 、167. Two Sum II
    从分类,排序,top-k多个方面对推荐算法稳定性的评价
  • 原文地址:https://www.cnblogs.com/ronnieyuan/p/11849564.html
Copyright © 2011-2022 走看看