zoukankan      html  css  js  c++  java
  • 【Spark篇】---Spark中广播变量和累加器

    一、前述

    Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。

    累机器相当于统筹大变量,常用于计数,统计。

    二、具体原理

    1、广播变量

    • 广播变量理解图

     

    • 注意事项

    1、能不能将一个RDD使用广播变量广播出去?

           不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

    2、 广播变量只能在Driver端定义不能在Executor端定义。

    3、 Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

    4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

    5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

     

    val conf = new SparkConf()
    conf.setMaster("local").setAppName("brocast")
    val sc = new SparkContext(conf)
    val list = List("hello xasxt")
    val broadCast = sc.broadcast(list)
    val lineRDD = sc.textFile("./words.txt")
    lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
    sc.stop()
    

     

     2、累加器

    • 累加器理解图

     Scala代码:

    import org.apache.spark.{SparkConf, SparkContext}
    
    object AccumulatorOperator {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("accumulator")
        val sc = new SparkContext(conf)
        val accumulator = sc.accumulator(0)
        sc.textFile("./records.txt",2).foreach {//两个变量
          x =>{accumulator.add(1)
          println(accumulator)}}
        println(accumulator.value)
        sc.stop()
      }
    }
    

     java代码:

    package com.spark.spark.others;
    
    import org.apache.spark.Accumulator;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    /**
     * 累加器在Driver端定义赋初始值和读取,在Executor端累加。
     * @author root
     *
     */
    public class AccumulatorOperator {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("accumulator");
            JavaSparkContext sc = new JavaSparkContext(conf);
            final Accumulator<Integer> accumulator = sc.accumulator(0);
    //        accumulator.setValue(1000);
            sc.textFile("./words.txt",2).foreach(new VoidFunction<String>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String t) throws Exception {
                    accumulator.add(1);
    //                System.out.println(accumulator.value());
                    System.out.println(accumulator);
                }
            });
            System.out.println(accumulator.value());
            sc.stop();
            
        }
    }

     结果:

    • 注意事项

                   累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

     

  • 相关阅读:
    再谈TextField
    IOS-TextField知多少
    leftBarButtonItems
    LeftBarButtonItems,定制导航栏返回按钮
    Apple Mach-O Linker (id) Error "_OBJC_CLASS...错误解决办法 Apple Mach-O Linker (id) Error "_OBJC_CLASS...错误解决办法
    Unrecognized Selector Sent to Instance问题之诱敌深入关门打狗解决办法
    UNRECOGNIZED SELECTOR SENT TO INSTANCE 问题快速定位的方法
    Present ViewController,模态详解
    UILABEL AUTOLAYOUT自动换行 版本区别
    iOS自动布局解决警告Automatic Preferred Max Layout Width is not available on iOS versions prior to 8.0
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8424681.html
Copyright © 2011-2022 走看看