zoukankan      html  css  js  c++  java
  • spark accumulator累加器

    java

     1 /**
     2  * accumulator可以让多个task共同操作一份变量,主要进行多个节点对一个变量进行共享性的操作,accumulator只提供了累加的功能
     3  * 只有driver可以获取accumulator的值
     4  * @author Tele
     5  */
     6 public class AccumulatorDemo {
     7     private static SparkConf conf = new SparkConf().setMaster("local").setAppName("AccumulatorDemo");
     8     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     9 
    10     public static void main(String[] args) {
    11         List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
    12 
    13         JavaRDD<Integer> rdd = jsc.parallelize(list);
    14 
    15         /*
    16          * Accumulator<Integer> accumulator = jsc.accumulator(10);
    17          * 
    18          * rdd.foreach(new VoidFunction<Integer>() {
    19          * 
    20          * private static final long serialVersionUID = 1L;
    21          * 
    22          * @Override public void call(Integer t) throws Exception { accumulator.add(t);
    23          * } }); System.out.println(accumulator.value());
    24          */
    25 
    26         LongAccumulator la = new LongAccumulator();
    27         la.setValue(100L);
    28 
    29         jsc.sc().register(la, "数值累加器");
    30 
    31         rdd.foreach(new VoidFunction<Integer>() {
    32 
    33             private static final long serialVersionUID = 1L;
    34 
    35             @Override
    36             public void call(Integer t) throws Exception {
    37                 // 不能在算子内部获得accumulator.value()
    38                 la.add(t);
    39             }
    40         });
    41 
    42         System.out.println(la.value());
    43         jsc.close();
    44     }
    45 }

    scala

     1 object AccumulatorDemo {
     2   def main(args: Array[String]): Unit = {
     3     val conf = new SparkConf().setMaster("local").setAppName("accumulator");
     4     val sc = new SparkContext(conf);
     5 
     6     val arr = Array(1, 2, 3, 4, 5);
     7     val rdd = sc.parallelize(arr, 1);
     8 
     9     val accumulator = new LongAccumulator;
    10     accumulator.add(100);
    11 
    12     sc.register(accumulator);
    13 
    14     rdd.foreach(accumulator.add(_));
    15 
    16     println(accumulator.value);
    17 
    18   }
    19 }
  • 相关阅读:
    [NOI2014]动物园
    2018.7.15模拟赛
    2018.7.13模拟赛
    [CodeForces]920F SUM and REPLACE
    [BZOJ3211]花神游历各国
    [GSS5] Can you answer these queries V
    [SPOJ1716] GSS3
    [HNOI2012]排队
    2018.7.10模拟赛
    7.3模拟赛
  • 原文地址:https://www.cnblogs.com/tele-share/p/10279239.html
Copyright © 2011-2022 走看看