zoukankan      html  css  js  c++  java
  • 使用 Spark 中的共享变量

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;

    import java.util.Arrays;
    import java.util.List;

    public class BroadcastVariable {
    public static void main(String[] args) {

    SparkConf conf = new SparkConf()
    .setAppName("BroadcastVariable")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numbers = Arrays.asList(1,2,3,4,5);
    JavaRDD<Integer> rdd = sc.parallelize(numbers);

    final int factor = 3;

    JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer v1) throws Exception {
    return v1 * factor;
    }
    });

    newNumbers.foreach(new VoidFunction<Integer>() {
    public void call(Integer number) throws Exception {
    System.out.println(number);
    }
    });
    }
    }

    如上代码在Driver端定义了一个变量 factor,在函数中调用这个factor。实际的执行过程中会发生什么事呢?
    假设一个节点上有100个task,那么Spark会为每个task复制一份factor变量放在内存中。
    但其实我们只是在函数中读取了这个变量的值进行了计算,完全没有必要复制100份,只需要在当前的Executor中保留一份,所有的task都来读取这一份数据就足够了。
    设想一下,如果要共享一个很大的变量,在每个task中都复制一份无疑会消耗巨大的网络带宽和节点内存,这是非常不合理的。

    基于这种情况,我们就可以使用广播变量。
    package com.rabbit;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;

    import java.util.Arrays;
    import java.util.List;

    public class BroadcastVariable {
    public static void main(String[] args) {

    SparkConf conf = new SparkConf()
    .setAppName("BroadcastVariable")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numbers = Arrays.asList(1,2,3,4,5);
    JavaRDD<Integer> rdd = sc.parallelize(numbers);

    final int factor = 3;
    //将factor转为广播变量
    final Broadcast<Integer> broadcastFactor = sc.broadcast(factor);
    JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer v1) throws Exception {
    //使用广播变量时,调用 value()方法获得其内部封装的值
    int factor = broadcastFactor.value();
    return v1 * factor;
    }
    });

    newNumbers.foreach(new VoidFunction<Integer>() {
    public void call(Integer number) throws Exception {
    System.out.println(number);
    }
    });
    }
    }

    Scala 版本:
    import org.apache.spark.{SparkConf, SparkContext}

    object BroadcastVariable {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    .setAppName("BroadcastVariable")
    .setMaster("local")

    val sc = new SparkContext(conf)

    val arr = Array(1,2,3,4,5)
    val numbers = sc.parallelize(arr)
    val factor = 3;
    val broadcastFactor = sc.broadcast(factor)

    val newNumbers = numbers.map(number => number * broadcastFactor.value)

    newNumbers.foreach(number => println(number))
    }

    }
     
  • 相关阅读:
    (转)修改Android解锁界面
    linux terminal 快捷键
    (转)Android蓝牙开发浅析
    (转)android 编译单个模块
    (转)Android关机AppWidget的实现
    MyEclipse修改页面模板(JSP和HTML等) 分类: WEB项目应用 20100131 00:03 698人阅读 评论(1) 收藏
    定时任务:Timer类、TimerTask类 分类: java 20100317 22:01 551人阅读 评论(0) 收藏
    Socket读取输入流 分类: java 20100322 17:38 3330人阅读 评论(0) 收藏
    [Microsoft][SQLServer 2000 Driver for JDBC]Object has been closed. 分类: 开发常见问题解决方案 20100318 22:05 1106人阅读 评论(0) 收藏
    3/17/10 9:55:59 AM CST: [INFO] User settings file does not exist C:/Documents and Settings/Administrator/.m2/settings.xml 分类: 开发常见问题解决方案 20100317 10:20 3448人阅读 评论(4) 收藏
  • 原文地址:https://www.cnblogs.com/rabbit624/p/10664567.html
Copyright © 2011-2022 走看看