zoukankan      html  css  js  c++  java
  • java实现spark常用算子之mapPartitionsWithIndex

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;

    /**
    * mapPartitionsWithIndex算子:
    * 与mapPartitions相似,可以看见使用到了哪一个partitions
    *
    * mapPartitions第二个参数preservesPartition(boolean,默认为false)的含义:
    * 此标志用于优化目的,当您不修改分区时,将它设置为false,
    * 如果您需要修改分区时,将它设置为true,这样spark可以更有效地执行操作,
    * 但如果您不告诉spark,它无法知道你的目的,也将无法达到优化的目的。
    *
    * 采用分区的话:parallelize优先级最高,其次是conf.set,最后是local[]
    */
    public class MapPartitionsWithIndexOperator {

    public static void main(String[] args){
    SparkConf conf = new SparkConf().setMaster("local").setAppName("mapPartitionsWithIndex");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<String> names = Arrays.asList("w1","w2","w3","w4","w5","W6","W7");

    //将list转为RDD并且分为2个partition
    JavaRDD<String> nameRDD = sc.parallelize(names,2);

    // Function2入参:第一个参数为partition的index,第二个为入参,第三个为返回值
    JavaRDD<String> resultRDD = nameRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    @Override
    public Iterator<String> call(Integer integer, Iterator<String> iterator) throws Exception {
    List<String> nameList = new ArrayList<>();
    while (iterator.hasNext()){
    nameList.add(integer+":"+iterator.next());
    }
    return nameList.iterator();
    }
    },true);

    //修改sparkRDD分区
    JavaRDD<String> repartitionRDD = resultRDD.repartition(4);
    System.err.println(repartitionRDD.partitions().size());

    repartitionRDD.foreach(new VoidFunction<String>() {
    @Override
    public void call(String s) throws Exception {
    System.err.println("mapPartitionsWithIndex:"+s);
    }
    });

    }
    }



    微信扫描下图二维码加入博主知识星球,获取更多大数据、人工智能、算法等免费学习资料哦!


  • 相关阅读:
    【设计模式】责任链模式
    python 获取cookie,发送请求
    python requests报错:Exception: ('Connection broken: IncompleteRead(3794 bytes read, 4398 more expect ed)', IncompleteRea
    获取二维数组 所有的组合排列
    tp3.0 验证码生成和使用
    MySQL: mysql is not running but lock exists 的解决方法
    [RxSwift]4.2.0、Observer
    [RxSwift]4.1.6、ControlEvent
    [RxSwift]4.1.5、Signal
    [RxSwift]4.1.4、Driver
  • 原文地址:https://www.cnblogs.com/guokai870510826/p/11635165.html
Copyright © 2011-2022 走看看