zoukankan      html  css  js  c++  java
  • 【Spark篇】---Spark中广播变量和累加器

    一、前述

    Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。

    累机器相当于统筹大变量,常用于计数,统计。

    二、具体原理

    1、广播变量

    • 广播变量理解图

     

    • 注意事项

    1、能不能将一个RDD使用广播变量广播出去?

           不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

    2、 广播变量只能在Driver端定义不能在Executor端定义。

    3、 Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

    4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

    5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

     

    val conf = new SparkConf()
    conf.setMaster("local").setAppName("brocast")
    val sc = new SparkContext(conf)
    val list = List("hello xasxt")
    val broadCast = sc.broadcast(list)
    val lineRDD = sc.textFile("./words.txt")
    lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
    sc.stop()
    

     

     2、累加器

    • 累加器理解图

     Scala代码:

    import org.apache.spark.{SparkConf, SparkContext}
    
    object AccumulatorOperator {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("accumulator")
        val sc = new SparkContext(conf)
        val accumulator = sc.accumulator(0)
        sc.textFile("./records.txt",2).foreach {//两个变量
          x =>{accumulator.add(1)
          println(accumulator)}}
        println(accumulator.value)
        sc.stop()
      }
    }
    

     java代码:

    package com.spark.spark.others;
    
    import org.apache.spark.Accumulator;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    /**
     * 累加器在Driver端定义赋初始值和读取,在Executor端累加。
     * @author root
     *
     */
    public class AccumulatorOperator {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("accumulator");
            JavaSparkContext sc = new JavaSparkContext(conf);
            final Accumulator<Integer> accumulator = sc.accumulator(0);
    //        accumulator.setValue(1000);
            sc.textFile("./words.txt",2).foreach(new VoidFunction<String>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String t) throws Exception {
                    accumulator.add(1);
    //                System.out.println(accumulator.value());
                    System.out.println(accumulator);
                }
            });
            System.out.println(accumulator.value());
            sc.stop();
            
        }
    }

     结果:

    • 注意事项

                   累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

     

  • 相关阅读:
    MongoDB的固定集合
    MongoDB的导入导出
    MongoDB的数据备份与恢复
    MongoDB的索引
    MongoDB简单CRUD场景
    MongoDB入门
    NOSQL概念入门
    Java静态代理和动态代理
    a=a+1背后的内存模型和CPU高速缓存
    SpringCloud的学习记录(6)
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8424681.html
Copyright © 2011-2022 走看看