zoukankan      html  css  js  c++  java
  • spark API 之 combineByKey

    以下代码是combineByKey的一个例子,把执行过程展示出来。

    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    /**
      * Created by luckuan on 16/4/19.
      */
    object TT {
      def main(args: Array[String]) {
    
        val sparkConf = new SparkConf()
        sparkConf.setMaster("local[*]")
        //    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        sparkConf.setAppName("ts")
        val sc: SparkContext = new SparkContext(sparkConf)
    
        val partitionNum = 2
    
        val rdd = sc.makeRDD(Seq(("a1", "11"), ("a1", "1"), ("a1", "111"), ("a2", "2"), ("a2", "22"), ("a3", "3"), ("a4", "4"), ("a1", "1111"), ("a2", "222")), partitionNum)
    
        val zero = () => scala.collection.mutable.HashSet.empty[String]
        val seq = (s: scala.collection.mutable.HashSet[String], v: String) => {
          val s_clone = s.clone() //此处clone是为了记录原有的值,否则在下次打印的时候是最终结果,不太直观,线上不需要用到clone.
          val ret = s += v
          println(s"seq-s:${s_clone}--seq-v:${v}---seq-rs:${ret}")
          ret
        }
        val comb = (u: scala.collection.mutable.HashSet[String], v: scala.collection.mutable.HashSet[String]) => {
          {
            val u_clone = u.clone()
            val ret = u ++ v
            println(s"comb-s:${u_clone}--comb-v:${v}---comb-rs:${ret}")
            ret
          }
        }
    
        val ret = rdd.combineByKey((v: String) => {
          println(s"根据[${v}]进行初始化")
          seq(zero(), v)
        }, seq, comb, new HashPartitioner(partitionNum)).collect()
    
        ret.foreach(println)
    
      }
    }
    

    RDD分区为1

    根据[11]进行初始化
    seq-s:Set()--seq-v:11---seq-rs:Set(11)
    seq-s:Set(11)--seq-v:1---seq-rs:Set(1, 11)
    seq-s:Set(1, 11)--seq-v:111---seq-rs:Set(1, 111, 11)
    根据[2]进行初始化
    seq-s:Set()--seq-v:2---seq-rs:Set(2)
    seq-s:Set(2)--seq-v:22---seq-rs:Set(2, 22)
    根据[3]进行初始化
    seq-s:Set()--seq-v:3---seq-rs:Set(3)
    根据[4]进行初始化
    seq-s:Set()--seq-v:4---seq-rs:Set(4)
    seq-s:Set(1, 111, 11)--seq-v:1111---seq-rs:Set(1, 1111, 111, 11)
    seq-s:Set(2, 22)--seq-v:222---seq-rs:Set(222, 2, 22)
    
    
    
    (a3,Set(3))
    (a4,Set(4))
    (a1,Set(1, 1111, 111, 11))
    (a2,Set(222, 2, 22))
    分区是1的情况
    首先判断当前key是否存在,如果存在,那么执行seq代码,将新值追加到已经存在的set中。如果不存在 调用zero的代码生成一个新的set
    这里没有执行comb方法,因为我们只有一个分区。
    

    RDD分区为2

    根据[11]进行初始化
    根据[22]进行初始化
    seq-s:Set()--seq-v:11---seq-rs:Set(11)
    seq-s:Set()--seq-v:22---seq-rs:Set(22)
    根据[3]进行初始化
    seq-s:Set(11)--seq-v:1---seq-rs:Set(1, 11)
    seq-s:Set()--seq-v:3---seq-rs:Set(3)
    seq-s:Set(1, 11)--seq-v:111---seq-rs:Set(1, 111, 11)
    根据[4]进行初始化
    seq-s:Set()--seq-v:4---seq-rs:Set(4)
    根据[2]进行初始化
    seq-s:Set()--seq-v:2---seq-rs:Set(2)
    根据[1111]进行初始化
    seq-s:Set()--seq-v:1111---seq-rs:Set(1111)
    seq-s:Set(22)--seq-v:222---seq-rs:Set(222, 22)
    
    
    
    comb-s:Set(2)--comb-v:Set(222, 22)---comb-rs:Set(222, 2, 22)
    comb-s:Set(1, 111, 11)--comb-v:Set(1111)---comb-rs:Set(1, 1111, 111, 11)
    
    
    (a3,Set(3))
    (a1,Set(1, 1111, 111, 11))
    (a4,Set(4))
    (a2,Set(222, 2, 22))
    
    “2”和“22”被初始化了两次,说明“a2"被初始化了2次,因为“a2"的数据划分到2个分区中,导致被初始化两次。
    
  • 相关阅读:
    window系统之mongoDB安装,启动及如何设置为windows服务(总结)
    永久激活pycharm 教程,方便,快捷,简单
    python classmethod方法 和 staticmethod
    Python 单元测试 之setUP() 和 tearDown()
    Git 自己的一些工作中的总结
    __str__ 和 __unicode__ 的区别和用法
    Bug 汇总
    如何理解API,API 是如何工作的
    AJAX
    集群配置虚拟主机及部署Hadoop集群碰到的问题
  • 原文地址:https://www.cnblogs.com/luckuan/p/5442154.html
Copyright © 2011-2022 走看看