zoukankan      html  css  js  c++  java
  • spark mapPartition方法与map方法的区别

     rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。

        两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。

        假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。

       

        //生成10个元素3个分区的rdd a,元素值为1~10的整数(1 2 3 4 5 6 7 8 9 10),sc为SparkContext对象

        val a = sc.parallelize(1 to 10, 3)

        //定义两个输入变换函数,它们的作用均是将rdd a中的元素值翻倍

        //map的输入函数,其参数e为rdd元素值   

        def myfuncPerElement(e:Int):Int = {

               println("e="+e)

               e*2

          }

         //mapPartitions的输入函数。iter是分区中元素的迭代子,返回类型也要是迭代子

        def myfuncPerPartition ( iter : Iterator [Int] ) : Iterator [Int] = {

             println("run in partition")

             var res = for (e <- iter ) yield e*2

              res

        }

        

        val b = a.map(myfuncPerElement).collect

        val c =  a.mapPartitions(myfuncPerPartition).collect

        在spark shell中运行上述代码,可看到打印了3次run in partition,打印了10次e=。

          从输入函数(myfuncPerElement、myfuncPerPartition)层面来看,map是推模式,数据被推到myfuncPerElement中;mapPartitons是拉模式,myfuncPerPartition通过迭代子从分区中拉数据。

        这两个方法的另一个区别是在大数据集情况下的资源初始化开销和批处理处理,如果在myfuncPerPartition和myfuncPerElement中都要初始化一个耗时的资源,然后使用,比如数据库连接。在上面的例子中,myfuncPerPartition只需初始化3个资源(3个分区每个1次),而myfuncPerElement要初始化10次(10个元素每个1次),显然在大数据集情况下(数据集中元素个数远大于分区数),mapPartitons的开销要小很多,也便于进行批处理操作。

        

       mapPartitionsWithIndex和mapPartitons类似,只是其参数多了个分区索引号。

    转载:http://wanshi.iteye.com/blog/2183906

  • 相关阅读:
    Docker-compose部署Elasticsearch+Kibana+Filebeat+APM(7.13.2)
    容器和镜像转化、迁移方式
    Docker部署redis主从+读写分离+哨兵
    简单的Redis及哨兵监控报警
    Prometheus监控docker容器
    Jenkins---多选参数构建
    Nginx——基于站点目录和文件的URL访问控制、禁止IP/非法域名访问
    Docker-compose构建jenkins环境
    Docker部署kafka集群
    Goreplay-使用真实流量测试
  • 原文地址:https://www.cnblogs.com/chengjunhao/p/8583219.html
Copyright © 2011-2022 走看看