zoukankan      html  css  js  c++  java
  • 寒假学习进度8

    今天继续学习spark双value算子

    (1)

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    //value,数据源类型要保持一致,拉链类型可以不一致

    //会报错,因为拉链分区数量要保持一致,并且分区中的数量要一致
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,5,6),2)
    val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 7, 8),4)

    //拉链
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))

    sc.stop()
    }

    (2)partitionBy

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4),2)

    //int类型转换成taper类型
    val maprdd: RDD[(Int, Int)] = rdd.map((_, 1))

    //partitionBy根据指定的分区规则对数据重新分区
    maprdd.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")


    sc.stop()
    }

    (3)reduceByKey

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",1)))

    //reduceByKey,相同的key的数据进行value数据的聚合操作
    //[1,2]
    //[3,3]
    //[6]
    val reducerdd: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
    x + y
    })

    reducerdd.collect().foreach(println)


    sc.stop()
    }

    (4)groupByKey

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",1)))

    //groupByKey,将数据源中的数据,相同的key分在一个组中,形成一个对偶元祖
    val grouprdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()

    grouprdd.collect().foreach(println)

    sc.stop()
    }

    (5)aggregateByKey

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)

    //aggregateByKey操作函数柯里化,2个参数列表
    //第一个参数列表,需要一个参数,表示初始值
    //主要用于第一个key时,和value进行分区计算
    //第二个参数列表需要2个参数
    //第一个参数表示分区内的计算规则
    //第二个参数表示分区间计算规则
    rdd.aggregateByKey(0)(
    (x,y)=>math.max(x,y),
    (x,y)=>x+y
    ).collect().foreach(println)

    sc.stop()
    }
  • 相关阅读:
    Android TabHost(选项卡)
    监控工具之---Prometheus查询持久性(六)
    监控工具之---Prometheus表达式promQL生产中应用(五)
    Grafana Configuration 参数详解(1)
    监控工具之---Prometheus数据可视化Grafana(七)
    监控工具之---Prometheus 安装详解(三)
    监控工具之---Prometheus 配置exporter四)
    Kubernetes容器编排技术---kubectl命令行工具用法详解(三)
    Kubernetes容器编排技术---Kubernetes基于kubeadm安装与配置(二)
    Azure Iaas基础之---创建虚拟机
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15760921.html
Copyright © 2011-2022 走看看