zoukankan      html  css  js  c++  java
  • spark mapPartition方法与map方法的区别

     rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。

        两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。

        假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。

       

        //生成10个元素3个分区的rdd a,元素值为1~10的整数(1 2 3 4 5 6 7 8 9 10),sc为SparkContext对象

        val a = sc.parallelize(1 to 10, 3)

        //定义两个输入变换函数,它们的作用均是将rdd a中的元素值翻倍

        //map的输入函数,其参数e为rdd元素值   

        def myfuncPerElement(e:Int):Int = {

               println("e="+e)

               e*2

          }

         //mapPartitions的输入函数。iter是分区中元素的迭代子,返回类型也要是迭代子

        def myfuncPerPartition ( iter : Iterator [Int] ) : Iterator [Int] = {

             println("run in partition")

             var res = for (e <- iter ) yield e*2

              res

        }

        

        val b = a.map(myfuncPerElement).collect

        val c =  a.mapPartitions(myfuncPerPartition).collect

        在spark shell中运行上述代码,可看到打印了3次run in partition,打印了10次e=。

          从输入函数(myfuncPerElement、myfuncPerPartition)层面来看,map是推模式,数据被推到myfuncPerElement中;mapPartitons是拉模式,myfuncPerPartition通过迭代子从分区中拉数据。

        这两个方法的另一个区别是在大数据集情况下的资源初始化开销和批处理处理,如果在myfuncPerPartition和myfuncPerElement中都要初始化一个耗时的资源,然后使用,比如数据库连接。在上面的例子中,myfuncPerPartition只需初始化3个资源(3个分区每个1次),而myfuncPerElement要初始化10次(10个元素每个1次),显然在大数据集情况下(数据集中元素个数远大于分区数),mapPartitons的开销要小很多,也便于进行批处理操作。

        

       mapPartitionsWithIndex和mapPartitons类似,只是其参数多了个分区索引号。

    转载:http://wanshi.iteye.com/blog/2183906

  • 相关阅读:
    Linux系统教程:设置GRUB菜单密码
    vimdiff的常用命令
    Zero-Copy实现原理
    解决业务代码里的分布式事务一致性问题
    用好这6个APP,学英语SO EASY!
    线程池调优
    理解select,poll,epoll实现分析
    时序图
    性能监控-TP理解
    sshd_config OpenSSH SSH 进程配置文件配置说明
  • 原文地址:https://www.cnblogs.com/chengjunhao/p/8583219.html
Copyright © 2011-2022 走看看