背景
map和flatmap,从字面意思或者官网介绍,可能会给一些人在理解上造成困扰【包括本人】,所以今天专门花时间来分析,现整理如下:
首先做一下名词解释------------------------------------------------
我的理解
map:map方法返回的是一个object,map将流中的当前元素替换为此返回值;
flatMap:flatMap方法返回的是一个stream,flatMap将流中的当前元素替换为此返回流拆解的流元素;
官方解释
map:Returns a stream consisting of the results of applying the given function to the elements of this stream.
返回一个流,包含给定函数应用在流中每一个元素后的结果
flatmap:Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element.
返回一个流,包含将此流中的每个元素替换为通过给定函数映射应用于每个元素而生成的映射流的内容
举例说明
有二箱鸡蛋,每箱5个,现在要把鸡蛋加工成煎蛋,然后分给学生。
map做的事情:把二箱鸡蛋分别加工成煎蛋,还是放成原来的两箱,分给2组学生;
flatMap做的事情:把二箱鸡蛋分别加工成煎蛋,然后放到一起【10个煎蛋】,分给10个学生;
完整测试代码如下:
1 public class Map_FlatMap {
2
3
4 List<String[]> eggs = new ArrayList<>();
5
6 @Before
7 public void init() {
8 // 第一箱鸡蛋
9 eggs.add(new String[]{"鸡蛋_1", "鸡蛋_1", "鸡蛋_1", "鸡蛋_1", "鸡蛋_1"});
10 // 第二箱鸡蛋
11 eggs.add(new String[]{"鸡蛋_2", "鸡蛋_2", "鸡蛋_2", "鸡蛋_2", "鸡蛋_2"});
12 }
13
14 // 自增生成组编号
15 static int group = 1;
16 // 自增生成学生编号
17 static int student = 1;
18
19 /**
20 * 把二箱鸡蛋分别加工成煎蛋,还是放在原来的两箱,分给2组学生
21 */
22 @Test
23 public void map() {
24 eggs.stream()
25 .map(x -> Arrays.stream(x).map(y -> y.replace("鸡", "煎")))
26 .forEach(x -> System.out.println("组" + group++ + ":" + Arrays.toString(x.toArray())));
27 /*
28 控制台打印:------------
29 组1:[煎蛋_1, 煎蛋_1, 煎蛋_1, 煎蛋_1, 煎蛋_1]
30 组2:[煎蛋_2, 煎蛋_2, 煎蛋_2, 煎蛋_2, 煎蛋_2]
31 */
32 }
33
34 /**
35 * 把二箱鸡蛋分别加工成煎蛋,然后放到一起【10个煎蛋】,分给10个学生
36 */
37 @Test
38 public void flatMap() {
39 eggs.stream()
40 .flatMap(x -> Arrays.stream(x).map(y -> y.replace("鸡", "煎")))
41 .forEach(x -> System.out.println("学生" + student++ + ":" + x));
42 /*
43 控制台打印:------------
44 学生1:煎蛋_1
45 学生2:煎蛋_1
46 学生3:煎蛋_1
47 学生4:煎蛋_1
48 学生5:煎蛋_1
49 学生6:煎蛋_2
50 学生7:煎蛋_2
51 学生8:煎蛋_2
52 学生9:煎蛋_2
53 学生10:煎蛋_2
54 */
55 }
56
57 }
主要区别:
map是对rdd中的每一个元素进行操作;
mapPartitions则是对rdd中的每个分区的迭代器进行操作
MapPartitions的优点:
如果是普通的map,比如一个partition中有1万条数据。ok,那么你的function要执行和计算1万次。
使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有
的partition数据。只要执行一次就可以了,性能比较高。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
的partition数据。只要执行一次就可以了,性能比较高。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
SparkSql或DataFrame默认会对程序进行mapPartition的优化。
MapPartitions的缺点:
如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下, 比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。
所以说普通的map操作通常不会导致内存的OOM异常。
所以说普通的map操作通常不会导致内存的OOM异常。
但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,
一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。
一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。
实现将每个数字变成原来的2倍的功能
def main(args: Array[String]): Unit = {
var conf = new SparkConf().setMaster("local[*]").setAppName("partitions")
var sc = new SparkContext(conf)
println("1.map--------------------------------")
var aa = sc.parallelize(1 to 9, 3)
def doubleMap(a:Int) : (Int, Int) = { (a, a*2) }
val aa_res = aa.map(doubleMap)
println(aa.getNumPartitions)
println(aa_res.collect().mkString)
println("2.mapPartitions-------------------")
val bb = sc.parallelize(1 to 9, 3)
def doubleMapPartition( iter : Iterator[Int]) : Iterator[ (Int, Int) ] = {
var res = List[(Int,Int)]()
while (iter.hasNext){
val cur = iter.next()
res .::= (cur, cur*2)
}
res.iterator
}
val bb_res = bb.mapPartitions(doubleMapPartition)
println(bb_res.collect().mkString)
println("3.mapPartitions-------------------")
var cc = sc.makeRDD(1 to 5, 2)
var cc_ref = cc.mapPartitions( x => {
var result = List[Int]()
var i = 0
while(x.hasNext){
val cur = x.next()
result.::= (cur*2)
}
result.iterator
})
cc_ref.foreach(println)
}
运行结果:
1.map-------------------------------- 3 (1,2)(2,4)(3,6)(4,8)(5,10)(6,12)(7,14)(8,16)(9,18) 2.mapPartitions------------------- (3,6)(2,4)(1,2)(6,12)(5,10)(4,8)(9,18)(8,16)(7,14) 3.mapPartitions------------------- 4 2 10 8 6
