zoukankan      html  css  js  c++  java
  • spark Accumulator累加器使用示例

    官网

    http://spark.apache.org/docs/2.3.1/rdd-programming-guide.html#accumulators

    http://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.util.AccumulatorV2

    Accumulator是spark提供的累加器,累加器的一个常用用途是在调试时对作业执行过程中的事件进行计数,但是只要driver能获取Accumulator的值(调用value方法), Task只能对其做增加操作,也可以给Accumulator命名(不支持Python),这样就可以在spark web ui中查看, 可以帮助了解程序运行的情况。

    数值累加器可通过调用SparkContext.longAccumulator()或SparkContext,doubleAccumulator()创建,分别用于累加Long或Double类型的值。运行在集群上的任务之后可使用add方法进行累加操作。但是,这些任务并不能读取累加器的值,只有驱动程序使用value方法能读取累加器的值。

    spark内置累加器使用示例

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.util.LongAccumulator;
    
    import java.util.Arrays;
    import java.util.HashSet;
    
    /**
     * spark内置了数值类型的累加器,比如LongAccumulator、DoubleAccumulator
     */
    public class TestAccumulator {
    
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            SparkSession spark = SparkSession
                    .builder()
                    .appName("gxl")
                    .master("local")
                    .config(conf)
                    .enableHiveSupport()
                    .getOrCreate();
    
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
    
            LongAccumulator accum = jsc.sc().longAccumulator();
            JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1, 2, 3));
    
    //        onlyMapOperator(accum,javaRDD);
    //        accumInActionOperator(accum,javaRDD);
    //        accumExecuteRepeatedly(accum,javaRDD);
        }
    
    
        private static void accumInActionOperator(LongAccumulator accum,JavaRDD<Integer> javaRDD){
            javaRDD.foreach(x -> accum.add(x));
            System.out.println(accum.value());
        }
    
        private static void onlyMapOperator(LongAccumulator accum,JavaRDD<Integer> javaRDD){
            //累加器也是lazy的,只有map操作的算子,累加器不会执行
            javaRDD.map((Function<Integer, Integer>) v1 -> {
                accum.add(v1);
                return v1;
            });
            System.out.println(accum.value());
        }
    
        private static void accumExecuteRepeatedly(LongAccumulator accum,JavaRDD<Integer> javaRDD){
            JavaRDD<Integer> map = javaRDD.map((Function<Integer, Integer>) v1 -> {
                accum.add(v1);
                return v1;
            });
    //        map.count();
    //        System.out.println(accum.value());
    //        map.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
    //        System.out.println(accum.value());
    
            //将map后的rdd缓存起来
            JavaRDD<Integer> cache = map.cache();
            cache.count();
            System.out.println(accum.value());
            cache.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
            System.out.println(accum.value());
        }
    
    }

    自定义spark累加器使用示例

    import org.apache.spark.util.AccumulatorV2;
    
    /**
     * spark 2.3
     * 自定义累计器需要继承AccumulatorV2,并且重写以下方法
     * 将符合条件的数据拼接在一起
     */
    public class MyAccumulator extends AccumulatorV2<String,String> {
    
        private StringBuffer stringBuffer = new StringBuffer();
        /**
         * Returns if this accumulator is zero value or not.
         * 返回该累加器是否为零值。
         * @return
         */
        @Override
        public boolean isZero() {
            return stringBuffer.length() == 0;
        }
    
        /**
         * Creates a new copy of this accumulator.
         * @return
         */
        @Override
        public AccumulatorV2<String,String> copy() {
            MyAccumulator newMyAccu = new MyAccumulator();
            newMyAccu.stringBuffer.append(stringBuffer);
            return newMyAccu;
        }
    
        /**
         * Resets this accumulator, which is zero value.
         */
        @Override
        public void reset() {
            stringBuffer.setLength(0);
        }
    
        /**
         * Takes the inputs and accumulates.
         * @param input
         */
        @Override
        public void add(String input) {
            stringBuffer.append(input).append(",");
        }
    
        /**
         * Merges another same-type accumulator into this one and update its state, i.e.
         * @param other
         */
        @Override
        public void merge(AccumulatorV2 other) {
            stringBuffer.append(other.value());
        }
    
        /**
         * Defines the current value of this accumulator
         * @return
         */
        @Override
        public String value() {
            return stringBuffer.toString();
        }
    }

    测试示例

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.util.LongAccumulator;
    
    import java.util.Arrays;
    import java.util.HashSet;
    
    /**
     * spark内置了数值类型的累加器,比如LongAccumulator、DoubleAccumulator
     */
    public class TestAccumulator {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            SparkSession spark = SparkSession
                    .builder()
                    .appName("gxl")
                    .master("local")
                    .config(conf)
                    .enableHiveSupport()
                    .getOrCreate();
    
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
            testMyAccumulator(jsc);
            
        }
    
        private static void testMyAccumulator(JavaSparkContext jsc){
            MyAccumulator myAccumulator = new MyAccumulator();
            jsc.sc().register(myAccumulator,"myAccumulator");
    
            HashSet<String> blacklist = new HashSet<>();
            blacklist.add("jack");
    
            JavaRDD<String> stringJavaRDD = jsc.parallelize(Arrays.asList("jack", "kevin", "wade", "james"));
            JavaRDD<String> filter = stringJavaRDD.filter((Function<String, Boolean>) v1 -> {
                if (blacklist.contains(v1)) {
                    return true;
                } else {
                    myAccumulator.add(v1);
                    return false;
                }
            });
            filter.count();
            System.out.println(myAccumulator.value());
        }
    
    }
  • 相关阅读:
    [LEETCODE] 初级算法/数组 1.1删除排序数组中的重复项
    [LeetCode]1.Two Sum 两数之和&&第一次刷题感想
    Panda的学习之路(3)——pandas 设置特定的值&处理没有数据的部分
    Panda的学习之路(2)——pandas选择数据
    Panda的学习之路(1)——series 和 Dataframe
    NUMPY的学习之路(2)——索引,合并,分割,赋值
    numpy的学习之路(1)——创建数组以及基本运算
    SpringBoot外部配置夹加载顺序
    SpringBoot2.0官方文档的位置
    @RestController注解
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12443416.html
Copyright © 2011-2022 走看看