zoukankan      html  css  js  c++  java
  • spark mapPartitionWithindex && repartition && coalesce

    mapPartitionWithindex  transformation算子,每次输入是一个分区的数据,并且传入数据的分区号

    spark.sparkContext.setLogLevel("error")
    val kzc=spark.sparkContext.parallelize(List(("hive",8),("apache",8),("hive",30),("hadoop",18)),2)
    val bd=spark.sparkContext.parallelize(List(("hive",8),("test",2),("spark",20)),1)
    val result=bd.union(kzc)
    def fun(x:Int,y:Iterator[(String,Int)]):Iterator[(Int,String,Int)]={
    val l=new scala.collection.mutable.ListBuffer[(Int,String,Int)]()
    while (y.hasNext){
    var tmpy=y.next()
    l.append((x,tmpy._1,tmpy._2))
    }
    l.iterator
    }
    val result2=result.mapPartitionsWithIndex(fun)
    result2.collect().foreach(println(_))

     repartition     transformation算子,从新定义分区,(多个分区分到一个分区不会产生shuffle)

    spark.sparkContext.setLogLevel("error")
        val kzc=spark.sparkContext.parallelize(List(("hive",8),("apache",8),("hive",30),("hadoop",18)),2)
        val bd=spark.sparkContext.parallelize(List(("hive",8),("test",2),("spark",20)),1)
        val result=bd.union(kzc).repartition(4)
        def fun(x:Int,y:Iterator[(String,Int)]):Iterator[(Int,String,Int)]={
          val l=new scala.collection.mutable.ListBuffer[(Int,String,Int)]()
          while (y.hasNext){
            var tmpy=y.next()
            l.append((x,tmpy._1,tmpy._2))
          }
          l.iterator
        }
        val result2=result.mapPartitionsWithIndex(fun)
        result2.collect().foreach(println(_))

    coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。true为产生shuffle,false不产生shuffle。默认是false。如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。

    spark.sparkContext.setLogLevel("error")
        val kzc=spark.sparkContext.parallelize(List(("hive",8),("apache",8),("hive",30),("hadoop",18)),2)
        val bd=spark.sparkContext.parallelize(List(("hive",8),("test",2),("spark",20)),1)
        val result=bd.union(kzc).coalesce(2,true)
        def fun(x:Int,y:Iterator[(String,Int)]):Iterator[(Int,String,Int)]={
          val l=new scala.collection.mutable.ListBuffer[(Int,String,Int)]()
          while (y.hasNext){
            var tmpy=y.next()
            l.append((x,tmpy._1,tmpy._2))
          }
          l.iterator
        }
        val result2=result.mapPartitionsWithIndex(fun)
        result2.collect().foreach(println(_))

  • 相关阅读:
    [原][诗]送幼儿园杨老师
    [原]由智能音箱被黑联想到。。。人类灭亡
    [转]VS中的路径宏 OutDir、ProjectDir、SolutionDir各种路径含义
    [原]globalmapper设置高程配色(globalmapper自定义配色方案)
    [原]JSON 字符串(值)做判断,比较 “string ”
    [转]使用 curl 发送 POST 请求的几种方式
    [转]c++多线程编程之pthread线程深入理解
    [转]c++ pthread 多线程简介
    Linux下ps -ef和ps aux的区别
    Java三种方式实现栈和队列
  • 原文地址:https://www.cnblogs.com/students/p/14262279.html
Copyright © 2011-2022 走看看