zoukankan      html  css  js  c++  java
  • 【Spark学习笔记】06-Spark常用算子

    一、Tranformation算子

    Transformations类算子叫做转换算子,该类算子是延迟加载,也叫懒加载,必须有action类算子才会触发。

    1.1 filter

    保留符合条件的数据,类似于SQL中的where子句。true保留,false过滤掉

    val rdd = sc.makeRDD(List(1, 1, 3, 4, 5, 6, 7, 8))
    rdd.filter(_ % 2 == 0).foreach(println)
    
    // 输出:
    4
    6
    8
    

    1.2 filterByRange

    作用KV格式的RDD上,保留Key值符合范围的数据

    val rdd1 = sc.parallelize(List(("a", 2), ("b", 1), ("d", 4), ("c", 1), ("e", 4))) 
    // key值符合b-d范围的
    rdd1.filterByRange("b", "d").foreach(println)
    
    // 输出:
    (b,1)
    (d,4)
    (c,1)
    

    1.3 map

    指定一个函数,将给定数据集中的记录一条一条的输入该函数处理以得到新的记录。一对一

    val words = sc.parallelize(List("spark", "flink", "java", "python"))
    words.map((_, 1)).foreach(println)
    
    // 输出:
    (spark,1)
    (flink,1)
    (java,1)
    (python,1)
    

    1.4 flapMap

    是对map算子的一个简单扩展,将RDD中的每一条数据映射多个输出项,flap映射要求输出的是一个可迭代类型一对多

    val rdd = sc.parallelize(List(("Hello Java"), ("Flink"), ("Hello Spark")))
    rdd.flatMap(_.split(" ")).foreach(println)
    
    // 输出:
    Hello
    Java
    Flink
    Hello
    Spark
    

    1.5 flapMapValues

    作用在KV格式的RDD上,对value进行处理。一对多

    val rdd = sc.parallelize(List(("a", "2 2"), ("b", "1 5"), ("d", "4 6")))
    val rdd2 = rdd1.flatMapValues(_.split(" ")) 
    
    // 输出:
    (a,2)
    (a,2)
    (b,1)
    (b,5)
    (d,4)
    (d,6)
    

    1.6 mapValues

    作用在KV格式的RDD上,对value进行处理,一对一

    val rdd = sc.parallelize(List(("a", "2 2"), ("b", "1 5"), ("d", "4 6")))
    rdd.mapValues(_ + "-hello").foreach(println)
    
    // 输出:
    (a,2 2-hello)
    (b,1 5-hello)
    (d,4 6-hello)
    

    1.7 mapPartitions

    与map类似,遍历的单位是每个partition上的数据。在将数据写入外部数据库(如mysql、redis)的场景时,建议使用mapPartitions代替map,避免频繁的创建关闭连接,性能更高。func函数输入输出必须都是可迭代类型

    // 指定两个分区
    val rdd: RDD[Int] = sc.parallelize(List[Int](1, 2, 3, 4, 5), 2)
    rdd.mapPartitions(iter => {
          val list = new ListBuffer[Int]
          println("创建连接")
          while(iter.hasNext) {
            list.append(iter.next())
          }
          println("数据插入 " + list.toString())
          println("关闭连接")
          list.iterator
        }).count()
    
    // 输出结果:
    创建连接
    数据插入 ListBuffer(1, 2)
    关闭连接
    创建连接
    数据插入 ListBuffer(3, 4, 5)
    关闭连接
    

    1.8 mapPartitionsWithIndex

    与mapPartitions类似,遍历的单位是一个partition上的数据。除此之外,还会携带分区的编号

    val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val result: RDD[String] = rdd.mapPartitionsWithIndex((index, iter) => {
          val list = new ListBuffer[String]
          iter.foreach(value => list.append("rdd index【" + index + "】,value【" + value + 		"】"))
          list.iterator
        })
    result.foreach(println)
    
    // 输出结果:
    rdd index【0】,value【1】
    rdd index【0】,value【2】
    rdd index【0】,value【3】
    rdd index【1】,value【4】
    rdd index【1】,value【5】
    rdd index【1】,value【6】
    

    1.9 sample

    随机抽样。三个参数:是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed)。seed为可选参数,当传入seed值时,每次抽样的结果相同

    val rdd = sc.makeRDD(List(1, 1, 3, 4, 5, 6, 7, 8))
    rdd.sample(true,0.5)foreach(println)
    
    // 输出结果(每次结果不同):
    1
    1
    4
    5
    7
    

    1.10 reduceByKey

    作用在KV格式RDD上,将相同的key的数据根据相应的逻辑进行处理。

    val rdd = sc.parallelize(List(("Spark", 1), ("Java", 1), ("Flink", 1),("Spark", 1)))
    rdd.reduceByKey(_ + _).foreach(println)
    
    // 输出:
    (Flink,1)
    (Spark,2)
    (Java,1)
    

    1.11 aggregateByKey

    作用在KV格式RDD上,先在分区内对key相同的数据进行聚合处理,然后对这些不同分区的小聚合结果进行大聚合。aggregateByKey(zeroValue)(seqOp, combOp):zeroValue表示每个组的初始值,seqOp是分区内小聚合逻辑,combOp是不同分区对小聚合结果进行大聚合的逻辑。

    val rdd1 = sc.parallelize(List(("dog", 2), ("cat", 1), ("mouse", 4), ("cat", 1), ("mouse", 4), ("dog", 2)),2) 
    val result = rdd1.aggregateByKey(0)(_ + _, _ + _) 
    
    // 输出结果:
    (dog,4)
    (cat,2)
    (mouse,8)
    

    1.12 groupBy

    根据指定元素分组,返回(K,Iterable)。

    val rdd: RDD[(String, String, Int)] = sc.parallelize(List(("Spark", "汪老师", 2), ("Java", "张老师", 1), ("Spark", "李老师", 4), ("Flink", "赵老师", 1)))
    rdd.groupBy(_._1).foreach(println)
    
    // 输出结果:
    (Flink,CompactBuffer((Flink,赵老师,1)))
    (Spark,CompactBuffer((Spark,汪老师,2), (Spark,李老师,4)))
    (Java,CompactBuffer((Java,张老师,1)))
    

    1.13 groupByKey

    作用在KV格式RDD上,根据Key进行分组。返回(K,Iterable

    val rdd: RDD[(String, Int)] = sc.parallelize(List(("Spark", 2), ("Java", 1), ("Spark",4), ("Flink", 1)))
    rdd.groupByKey().foreach(println)
    
    // 输出结果:
    (Flink,CompactBuffer(1))
    (Spark,CompactBuffer(2, 4))
    (Java,CompactBuffer(1))
    
  • 相关阅读:
    dapper 批量删除、新增、修改说明
    android 加载assets目录下的静态html文件案例
    webstorm中使用git提交代码时出现unversioned files错误
    windows server 2008 R2 x64 部署.net core 3.1项目
    asp.net core 项目添加nlog日志(loggerFactor.AddNLog 过时处理(.net core 3.1))
    机器学习笔记之一步步教你轻松学主成分分析PCA降维算法
    机器学习笔记之类别特征处理
    机器学习笔记之range, numpy.arange 和 numpy.linspace的区别
    机器学习笔记之Numpy的random函数
    机器学习笔记之矩阵分解 SVD奇异值分解
  • 原文地址:https://www.cnblogs.com/yangyh11/p/13599449.html
Copyright © 2011-2022 走看看