shuffle是将Map的输出结果进行分区,排序,分组等处理之后交给Reduce进行处理的过程
Map端的shuffle
-
写入缓存
每一个Map都会被分配一个环形的缓冲区,设置一个缓冲区大小和阈值,当缓存区数据积累达到阈值时,开始向磁盘写入(写入的是序列化完的key和value),在写入的过程中Map继续输出到缓冲区,如何在此期间缓冲区满,则会阻塞Map。
设置缓冲区大小,默认是100MB
mapreduce.task.io.sort.mb
设置阈值大小,默认是80%
mapreduce.map.sort.spill.percent
-
溢写过程(分区,排序,分组)
溢写之前将会进行三个过程分别是Partitione,sort,Combine- 分区(Partitione)
默认的hash分区会根据Reducer任务数量进行分区计算,源码如下
public class HashPartitioner<K, V> extends Partitioner<K, V> { public HashPartitioner() { } // 对key进行哈希后再对任务数量进行取模 public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & 2147483647) % numReduceTasks; } }
也可以重载Partitioner进行自定义分区
-
排序(sort)
对于每个分区内的<key,value>内存会自动对其进行排序 -
合并(Combine)
排序结束后,进行可选的Combine操作,进行累加等操作
- 分区(Partitione)
-
文件合并
在Map任务结束之前会对所有的溢写文件进行归并,所谓的归并操作如图所示
如图所示
min.num.spill.for.combine设置溢写文件数量,如何超过这个限制,可以再次运行Combiner -
压缩
mapred.compress.map.output设置启动压缩,默认不启动
Reducer端的Shuffle
-
领取数据
Reducer端通过心跳机制定时通过RPC向JobTracker询问Map是否完成,一旦完成,就在Mapper机器的本地磁盘上将自己要读取的数据拿过来存放在自己的磁盘上。因为存在多个Map所以Reducer会使用多个线程从Map上领取数据。 -
数据归并
Reducer从Map上拿到数据储存在缓存中,当缓存满时同Map端一样写入磁盘中,在溢写阶段Reducer并没有开始,可以分配大量内存作为缓存。Reducer对取得的数据进行归并,通过设置io.sort.factor来设置每轮归并的文件数量。Reducer函数默认每次处理一组具有相同key的value值,也可以通过继承WritableComparator,设置job.setGroupingComparatorClass()来自定义分组。
小尾巴:要看源码
网友评论