- map/flatMap:对数据集中的每条数据处理,有返回值
- sorted/sortedBy/sortWith:对数据集中的数据按照指定的方式进行排序
- groupBy:对数据集中的的数据按照某种方式进行分组
- reduce/fold:对数据集中的数据进行合并
- filter/filterNot:对数据集中的数据进行过滤
- foreach:对数据集中的每条数据处理,没有返回值
map和flatMap
map
//在高阶函数里面下面的写法就更加的优化,可以获取各个值
RDD.map(item=>{
val Array(name, age, sex, phone) = item.split(",")
name
})
val list = List(1, 2, 7, 8, 3, 4, 9, 10, 6, 5)
/**
* def map(f: A => B)
* map的参数是一个函数。f: A => B
* A -> 表示的是列表List中数据类型,此处是Int类型
* 此处 B属于泛型,不是Unit类型
* 作用:对集合中每一个元素进行操作,将每个元素传递到指定的函数f中,进行操作,并且有返回值
*/
val mapList: List[Double] = list.map(item => item * 2.0)
list.map(item => {
println(s"item = $item, transform = ${item * 2.0}")
item * 2.0
})
flatMap
/**
* flatMap() -> f: A => Traversable[B]
* 对List中每个元素进行操作,将每个元素传入到函数f中,进行操作,返回Traversable
* Traversable:是可遍历对象,可以认为就是一个集合
* 然后将每个Traversable中的元素映射到新的List集合(RDD)中
*/
val flatMapList= list.flatMap(item => {
val range: Inclusive = 0 to item
println(s"item = $item, range = ${range.mkString("|")}")
range.mkString("|").split("|")
}).foreach(println)
Map和flatMap的区别
// map和flatmap区别:map不会增加RDD的count,flatMap会增加RDD的count
// flatmap的f函数的返回值是Traversable,然后将Traversable中的每个元素映射到新的RDD(list)中,会增加RDD的count
// map的f函数的返回值可以是不为Unit的任意的数据类型
val lineList = List("hadoop,spark,hive,spark,hadoop,spark", "spark,spark")
lineList.map(line => line.split(",").toList.mkString(" ")).foreach(println)
lineList.flatMap(line => line.split(",")).foreach(println)
sorted/sortedBy/sortWith
-
sorted/sortBy/sortWith:都是对集合中的元素进行排序
-
只有soreBy是RDD的函数,使用的时候execute的数目只能设置为一个
val li = List(7, 8, 3, 4, 9)
val list: RDD[Int] = sc.parallelize(li)
// sorted:默认的情况下,按照数据的自然方式进行升序排序
li.sorted.foreach(item => println(item))
// def sortBy[B](f: A => B):按照函数特定的方式排序
list.sortBy(item => - item).foreach(item => println(item))
// def sortWith(lt: (A, A) => Boolean):按照比较之后结果为true的方式排序
li.sortWith((x1, x2) => {
println(s"x1 = $x1, x2 = $x2")
x1 > x2
}).foreach(item => println(item))
groupBy
/**
* def groupBy(f: A => K)
* 说明:集合中每个元素应用到函数f中,返回值k就是分组的类别
*/
val list = List(7, 8, 3, 4, 9)
val groupMap: Map[String, List[Int]] = list.groupBy(item => if(item % 2 ==0) "偶数" else "奇数")
// 列表中存在数据类型二元组
val tupleList = List(("spark", 34), ("scala", 12), ("spark", 10), ("spark", 1), ("hive", 1)
val tupleGroupMap: Map[String, List[(String, Int)]] = tupleList.groupBy(item => item._1)
tupleGroupMap.map(t2 => {
val word: String = t2._1
val list: List[(String, Int)] = t2._2
val count = list.map(item => item._2).sum
(word, count)
})
reduce/fold
-
left:有三层含义:
-
f函数的左边的参数为中间临时变量
-
集合从左边开始聚合
-
集合左边的第一个元素为中间临时变量的初始值
-
reduce和reduceLeft的区别:
-
reduceLeft中间临时变量的数据类型是集合中传入的数据类型的父集(reduceLeft的函数参数的返回值向上转型) def reduceLeft[B >: A](f: (B, A) => B)
-
reduce:中间临时变量的数据类型和传入数据中数据类型相同 def reduce(op: (A1, A1) => A1)
-
fold和foldLeft的区别:
-
foldLeft:中间临时变量的类型 是任意类型def foldLeft[B](z: B)(f: (B, A) => B)
-
fold:初始赋值的数据类型和传入数据中数据类型相同def fold(z: A1)(op: (A1, A1) => A1)
-
reduceLeft和foldLeft的区别:
-
reduceLeft:中间变量的初始值只能是集合中左边的第一个元素,数据类型为集合中元素的父集 def reduceLeft[B >: A](f: (B, A) => B)
-
foldLeft:中间变量的初始值是自己选择的任意类型 def foldLeft[B](z: B)(f: (B, A) => B)
reduce
/**
* def reduce(op: (A1, A1) => A1): A1 =reduceLeft(op)
* - reduce函数中参数的类型是一个函数,并且是两个参数的:
* op 函数的要求如下;
* 第一点:参数类型和返回值的类型是一致的
* 第二点:有两个参数,其中
* 第一个参数为“聚合”的中间临时变量,初始值为第一个元素的值
* 第二个参数是集合中除去第一个元素以外的每个元素的值
*/
list.reduce((x1, x2) => {
println(s"x1 = $x1, x2 = $x2")
x1 + x2
})
reduceLeft
/**
* def reduceLeft[B >: A](f: (B, A) => B): B
* - reduceLeft函数中参数的类型是一个函数,并且是两个参数的:
* op 函数的要求如下;
* 第一点:中间临时变量的参数类型B是集合中传入参数类型A的父集,返回值类型是中间临时变量类型B
* 第二点:有两个参数,其中
* 第一个参数B为“聚合”的中间临时变量,初始值是从左边开始选的,reduce底层就是调用reduceLeft
* 第二个参数A是集合中除去第一个元素以外的每个元素的值
*/
list.reduceLeft((tmp, item) => {
println(s"tmp = $tmp, x2 = $item")
tmp + item
})
reduceRight
/**
* def reduceRight[B >: A](op: (A, B) => B): B
* - reduceRight函数中参数的类型是一个函数,并且是两个参数的:
* op 函数的要求如下;
* 第一点:中间临时变量的参数类型B是集合中传入参数类型A的父集,返回值类型是中间临时变量类型B
* 第二点:有两个参数,其中
* 第一个参数B为“聚合”的中间临时变量,初始值是从右边开始选的
* 第二个参数A是集合中除去第一个元素以外的每个元素的值
*/
list.reduceRight((item, tmp) => {
println(s"tmp = $tmp, x2 = $item")
tmp + item
})
fold
/**
* def fold(z: A1)(op: (A1, A1) => A1): A1 = foldLeft(z)(op)
* 参数说明:
* -a. 第一个参数:z
* 表明的是Zero(初始化):中间临时变量的初始化值
* -b. 第二个参数:op: (A1, A1) => A1
* A1表示的是中间临时变量 类型
* A2 表示的是 集合个每个元素;类型
* -c. 中间临时变量。临时变量的初始值,返回值和集合中的元素都相同
*/
val list = List(1, 2, 7, 8, 3, 4, 9, 10, 6, 5)
// 求和,此处使用fold函数大材小用
list.fold(0)((tmp, item) => {
println(s"tmp = $tmp, item = $item")
tmp + item
})
foldLeft
/**
* def foldLeft[B](z: B)(f: (B, A) => B): B
* 参数说明:
* -a. 第一个参数:z
* 表明的是Zero(初始化):中间临时变量的初始化值
* -b. 第二个参数:(f: (B, A) => B
* B表示的是中间临时变量 类型
* A表示的是 集合个每个元素类型
* -c. 中间临时变量不需要和集合中的元素的值相同,可以是自己定义的任何数据类型
*/
val lst = List(11, 22, 11, 11, 22, 99, 100)
// TODO: 使用foldLeft/foldRight函数对集合中数据进行去重
import scala.collection.mutable
lst.foldLeft(mutable.Set[Int]())((set, item) => {
println(s"set = $set, item = $item")
set += item
})
foldRight
/**
* foldRight[B](z: B)(op: (A, B) => B): B
* 参数说明:
* -a. 第一个参数:z
* 表明的是Zero(初始化):中间临时变量的初始化值
* -b. 第二个参数:(op: (A, B) => B
* B表示的是中间临时变量 类型
* A表示的是 集合个每个元素类型
* -c. 中间临时变量不需要和集合中的元素的值相同,可以是自己定义的任何数据类型
*/
lst.foldRight(mutable.Set[Int]())((item, set) => {
println(s"set = $set, item = $item")
set += item
})
filter/filterNot
/**
* def filter(p: A => Boolean):
* 将集合中的每一个元素传递到函数p中,返回值是Boolean类型
* filter的结果是函数p的返回值是true的元素组成的新的集合
*/
list.filter(item => item % 2 != 0).foreach(println)
/**
* def filterNot(p: A => Boolean):
* 将集合中的每一个元素传递到函数p中,返回值是Boolean类型
* filterNot的结果是函数p的返回值是false的元素组成的新的集合
*/
list.filterNot(item => item % 2 == 0).foreach(println)
foreach
// def foreach(f: T => Unit):针对集合中每个元素进行操作,没有返回值
list.foreach(item =>item.toString)
list.foreach(item => println(s"item = $item, transform = ${item * 2.0}"))