zoukankan      html  css  js  c++  java
  • 寒假学习进度8

    今天继续学习spark双value算子

    (1)

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    //value,数据源类型要保持一致,拉链类型可以不一致

    //会报错,因为拉链分区数量要保持一致,并且分区中的数量要一致
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,5,6),2)
    val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 7, 8),4)

    //拉链
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))

    sc.stop()
    }

    (2)partitionBy

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4),2)

    //int类型转换成taper类型
    val maprdd: RDD[(Int, Int)] = rdd.map((_, 1))

    //partitionBy根据指定的分区规则对数据重新分区
    maprdd.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")


    sc.stop()
    }

    (3)reduceByKey

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",1)))

    //reduceByKey,相同的key的数据进行value数据的聚合操作
    //[1,2]
    //[3,3]
    //[6]
    val reducerdd: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
    x + y
    })

    reducerdd.collect().foreach(println)


    sc.stop()
    }

    (4)groupByKey

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",1)))

    //groupByKey,将数据源中的数据,相同的key分在一个组中,形成一个对偶元祖
    val grouprdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()

    grouprdd.collect().foreach(println)

    sc.stop()
    }

    (5)aggregateByKey

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)

    //aggregateByKey操作函数柯里化,2个参数列表
    //第一个参数列表,需要一个参数,表示初始值
    //主要用于第一个key时,和value进行分区计算
    //第二个参数列表需要2个参数
    //第一个参数表示分区内的计算规则
    //第二个参数表示分区间计算规则
    rdd.aggregateByKey(0)(
    (x,y)=>math.max(x,y),
    (x,y)=>x+y
    ).collect().foreach(println)

    sc.stop()
    }
  • 相关阅读:
    常见hash算法的原理
    【学习干货】给coder的10个读书建议
    htc one x刷机记录
    Linux 搭建SVN server
    javascript
    USACO comehome Dijkstra
    当设计师、产品经理和程序员去交友站点找女朋友
    Spring3.0 AOP 具体解释
    慕课网Hibernate初探之一对多映射实验及总结
    C++数组引用
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15760921.html
Copyright © 2011-2022 走看看