zoukankan      html  css  js  c++  java
  • java spark转换算子mapPartitions、mapPartitionsWithIndex

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/13 14:13
     * java map,mapfunctions
     **/
    public class spark_function {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark_java_function");
            JavaSparkContext sc = new JavaSparkContext(conf);
            List<String> list = Arrays.asList("a","b","c","d");
            JavaRDD<String> parallelize = sc.parallelize(list,3);
            parallelize
                    .mapPartitions(
                    new FlatMapFunction<Iterator<String>, String>() {
                        @Override
                        public Iterator<String> call(Iterator<String> stringIterator) throws Exception {
                            List<String> list = new ArrayList<>();
                            while (stringIterator.hasNext()){
                                String next = stringIterator.next();
                                System.out.println("mapPartitions函数的输出"+next);
                                list.add(next);
                            }
                            return list.iterator();
                        }
                    }
            )
                    .mapPartitionsWithIndex(
                    new Function2<Integer, Iterator<String>, Iterator<String>>() {
                        @Override
                        public Iterator<String> call(Integer integer, Iterator<String> stringIterator) throws Exception {
                            List<String> l = new ArrayList();
                            while (stringIterator.hasNext()){
                                String next = stringIterator.next();
                                System.out.println("分区id:"+integer+"--值:"+next);
                                l.add(next+integer);
                            }
                            return l.iterator();
                        }
                    }
            ,false).collect();
        }
    }
  • 相关阅读:
    Angular27 指令
    Angular26 ng-content和ng-container、投影的使用
    denied: requested access to the resource is denied
    kali 扫描之burp Suite学习笔记1
    VMware的包格式vmdk转换为virtualBox的ova
    面试之leetcode分治-求众数,x幂等
    C/c++语言开源项目总结
    面试之哈希表leetcode
    面试之leetcode20堆栈-字符串括号匹配,队列实现栈
    面试之leetcode链表
  • 原文地址:https://www.cnblogs.com/7749ha/p/12882276.html
Copyright © 2011-2022 走看看