学习参考自 http://spark-internals.books.yourtion.com/markdown/4-shuffleDetails.html
1. Shuffle read 边 fetch 边处理还是一次性 fetch 完再处理?
边 fetch 边处理。
- MapReduce
shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。
- Spark
因为 Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。
使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 <key, value=""> record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value)
,比如上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value
,并将 func 的结果重新 put(key) 到 HashMap 中去。
2. Shuffle --> Merge --> Combine --> Sort
3. DAG 有向无环图
一个有向图无法从某个顶点出发经过若干条边回到该点。
4. Mapreduce 工作原理:
- Mapreduce的默认排序:
按照key值进行排序的,如果key为封装int的IntWritable类型,那么按照数字大小对key排序,如果key为封装为String的Text类型,那么按照字典顺序对字符串排序。
- 也就是在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。
reduce拿到<key,value-list>之后,将输入的key作为value输出,并根据value-list中元素的个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。
- combine 分为map端和reduce端,作用是把同一个key的键值对合并在一起,可以自定义的。
combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一个新的<key2,values2>, 将新的<key2,values2>作为输入到reduce函数中
(这个values2,表示有多个value。这个合并的目的是为了减少网络传输。 - partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。
(这里其实可以理解归类,我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝)
partition的作用就是把这些数据归类。只不过在写程序的时候,mapreduce使用哈希HashPartitioner帮我们归类了。也可以自定义。
- Map的结果,会通过partition分发到Reducer上
- Reducer做完Reduce操作后,通过OutputFormat,进行输出
- shuffle阶段的主要函数是fetchOutputs(), 这个函数的功能就是将map阶段的输出,copy到reduce 节点本地。(comibine 和partition主要使用的函数)