zoukankan      html  css  js  c++  java
  • 使用 Spark 中的共享变量

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;

    import java.util.Arrays;
    import java.util.List;

    public class BroadcastVariable {
    public static void main(String[] args) {

    SparkConf conf = new SparkConf()
    .setAppName("BroadcastVariable")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numbers = Arrays.asList(1,2,3,4,5);
    JavaRDD<Integer> rdd = sc.parallelize(numbers);

    final int factor = 3;

    JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer v1) throws Exception {
    return v1 * factor;
    }
    });

    newNumbers.foreach(new VoidFunction<Integer>() {
    public void call(Integer number) throws Exception {
    System.out.println(number);
    }
    });
    }
    }

    如上代码在Driver端定义了一个变量 factor,在函数中调用这个factor。实际的执行过程中会发生什么事呢?
    假设一个节点上有100个task,那么Spark会为每个task复制一份factor变量放在内存中。
    但其实我们只是在函数中读取了这个变量的值进行了计算,完全没有必要复制100份,只需要在当前的Executor中保留一份,所有的task都来读取这一份数据就足够了。
    设想一下,如果要共享一个很大的变量,在每个task中都复制一份无疑会消耗巨大的网络带宽和节点内存,这是非常不合理的。

    基于这种情况,我们就可以使用广播变量。
    package com.rabbit;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;

    import java.util.Arrays;
    import java.util.List;

    public class BroadcastVariable {
    public static void main(String[] args) {

    SparkConf conf = new SparkConf()
    .setAppName("BroadcastVariable")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numbers = Arrays.asList(1,2,3,4,5);
    JavaRDD<Integer> rdd = sc.parallelize(numbers);

    final int factor = 3;
    //将factor转为广播变量
    final Broadcast<Integer> broadcastFactor = sc.broadcast(factor);
    JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer v1) throws Exception {
    //使用广播变量时,调用 value()方法获得其内部封装的值
    int factor = broadcastFactor.value();
    return v1 * factor;
    }
    });

    newNumbers.foreach(new VoidFunction<Integer>() {
    public void call(Integer number) throws Exception {
    System.out.println(number);
    }
    });
    }
    }

    Scala 版本:
    import org.apache.spark.{SparkConf, SparkContext}

    object BroadcastVariable {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    .setAppName("BroadcastVariable")
    .setMaster("local")

    val sc = new SparkContext(conf)

    val arr = Array(1,2,3,4,5)
    val numbers = sc.parallelize(arr)
    val factor = 3;
    val broadcastFactor = sc.broadcast(factor)

    val newNumbers = numbers.map(number => number * broadcastFactor.value)

    newNumbers.foreach(number => println(number))
    }

    }
     
  • 相关阅读:
    学习笔记 css3--选择器&新增颜色模式&文本相关
    HTML5之新增标签用途及应用场景
    js中邦定事件与解绑支持匿名函数
    常用网站开发类Firefox扩展插件 (转)
    使用Easy4net编写代码生成器
    jQuery分页插件jBootstrapPage,一个Bootstrap风格的分页插件
    使用jQuery插件jRemoteValidate进行远程ajax验证,可以自定义返回的信息
    (新)自己动手写ORM框架(1)-增删查改的使用
    (3) iOS开发之UI处理-UIView篇
    (2) iOS开发之UI处理-UILabel篇
  • 原文地址:https://www.cnblogs.com/rabbit624/p/10664567.html
Copyright © 2011-2022 走看看