zoukankan      html  css  js  c++  java
  • spark map flatmap collect方法

    spark rdd的转化方法

    rdd作为抽象分布式数据集,有常见的转化函数,比如map,flatmap,collect

    map和flatMap方法区别

    flatmap返回的是扁平化的数值,返回的更多。
    map返回的T数目跟原来是一样的,对原来的数据做了处理仍然封装了在一起

    collect返回

    collect返回的是一个rdd数据List,打印出来就是数组

    例子

    package com.learn.hadoop.spark.doc.analysis.chpater.rdd;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    /**
     * map与flatmap的区别
     * map
     * [hello, spark, world]
     * [hello, java, world]
     * [hello, python, world]
     *
     * flatmap
     * hello
     * spark
     * world
     * hello
     * java
     * world
     * hello
     * python
     * world
     *
     * 测试collect方法
     * [[hello, spark, world], [hello, java, world], [hello, python, world]]
     */
    public class RddTest02 {
        public static void main(String[] args) {
            //SparkConf sparkConf =new SparkConf().setMaster("local").setAppName("wordCount");
            //System.setProperty("hadoop.home.dir","D:\work\tools\hadoop-3.0.0");
            SparkConf sparkConf =new SparkConf().setMaster("local").setAppName("RddTest01");
            JavaSparkContext sc =new JavaSparkContext(sparkConf);
            JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello spark world ","hello java world","hello python world"));
            
            JavaRDD<String> flatmap =rdd.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
            System.out.println("flat map:");
            flatmap.foreach(s -> System.out.println(s));
            System.out.println("map");
            JavaRDD<String> rddmap =rdd.map(new Function<String, String>() {
                @Override
                public String call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).toString();
                }
            });
            rddmap.foreach(s -> System.out.println(s));
    
            //返回的是一个List,但打印来就是一个数组
            System.out.println("test collect");
            System.out.println(rddmap.collect());
    
        }
    }
    
    
  • 相关阅读:
    [JavaEE] Hibernate连接池配置测试
    restful--参考文档
    webapi--参考文档
    注册中心--参考文档
    .NET Core中文智能辅助提示设置
    .NET Core+web服务器在本地机房用几台电脑实现负载均衡+集群
    调式、跟踪
    微服务架构:介绍、分布式与集群、架构四要素、设计模式、架构说明、项目结构说明、通讯方式、架构演进
    异步:Task、Async、await、委托异步(Invoke、BeginInvoke、EndInvoke)
    IO使用解析
  • 原文地址:https://www.cnblogs.com/JuncaiF/p/12400965.html
Copyright © 2011-2022 走看看