MapReduce(六):Reduce阶段

作者: b91cbec6a902 | 来源:发表于2019-06-19 17:31 被阅读5次

概述

基于Hadoop 2.x

Reduce阶段什么时候开始?

默认情况下,当Map任务完成个数达到Map任务总数的5%时,MRAppMaster开始向Yarn为Reduce申请Container,并启动Reduce进程。

此时Reduce启动后拉取每个Map的结果集中属于自己的那个分区的数据。注意这时候Reduce虽然启动了,但执行的操作只是拉取分区数据,也就是说必须所有的Map任务全部完成,所有的分区数据都已经拉取,Reduce阶段才真正开始处理数据。

这个阈值偏低,容易造成Reduce阶段早早的启动,占用了有限的资源,却没干多少活。可通过以下参数调整此阈值:

<property>
    <name>mapreduce.job.reduce.slowstart.completedmaps</name>
    <value>0.8</value>
</property>
Reduce阶段从哪里拉取Map阶段产生的数据?

通过HTTP请求从Shuffle HTTP Server拉取数据。
Shuffle HTTP Server是由NodeManager启动的。使用的是Yarn的AuxServices机制,NodeManager允许用户通过配置附属服务的方式扩展自己的功能,这使得每个节点可以定制一些特定框架需要的服务。附属服务需要在NodeManagger启动前配置好,并由NodeManagger统一启动和关闭。
Map阶段将结果数据提交到Shuffle HTTP Server后Map进程就退出了,后续Reduce进程是从Shuffle HTTP Server拉取数据的。

可以从yarn-site.xml找到如下配置:

<property>
    <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
Reduce阶段的流程是怎么样的?

Reduce阶段启动的进程也是YarnChild -> org.apache.hadoop.mapred.YarnChild#main。整个Reduce阶段可以分为三个阶段:①Map结果数据拉取;②合并排序;③执行Reduce处理逻辑。

if (isMapOrReduce()) {
  copyPhase = getProgress().addPhase("copy");
  sortPhase  = getProgress().addPhase("sort");
  reducePhase = getProgress().addPhase("reduce");
}

1、在MRAppMaster进程中通过ContainerLauncher向NodeManager发送Container启动命令,启动YarnChild进程org.apache.hadoop.mapred.YarnChild#main,通过启动命令传入了以下几个参数:
①MRAppMaster进程中的TaskAttemptListener组件提供的TaskUmbilicalProtocol服务的host和port。
②当前任务的TaskAttemptID。
③当前任务的JVMId。
注意:TaskAttemptID很重要,它是Reduce任务的唯一标识,也是Reduce阶段拉取Map结果数据的凭证,它决定了拉取Map结果数据的哪个分区。

class YarnChild {

    public static void main(String[] args) throws Throwable {

        String host = args[0];

        int port = Integer.parseInt(args[1]);

        final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);

        long jvmIdLong = Long.parseLong(args[3]);

        .....
    }
}

2、初始阶段跟Map阶段一样,通过RPC协议TaskUmbilicalProtocol与MRAppMaster建立通讯。

final InetSocketAddress address =
        NetUtils.createSocketAddrForHost(host, port);

final TaskUmbilicalProtocol umbilical =
  taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
  @Override
  public TaskUmbilicalProtocol run() throws Exception {
    return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
        TaskUmbilicalProtocol.versionID, address, job);
  }
});

3、通过RPC协议TaskUmbilicalProtocol获取要处理的任务,Reduce阶段获取的任务实例为org.apache.hadoop.mapred.ReduceTask,然后开始执行任务。

myTask = umbilical.getTask(context);

taskFinal.run(job, umbilical); // run the task

4、初始化Shuffle组件,开始后半段Shuffle(整个Shuffle的前半段在发生在Map阶段)。

# org.apache.hadoop.mapred.ReduceTask#run

ShuffleConsumerPlugin shuffleConsumerPlugin = null;
// 加载Shuffle组件
Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
// 创建Shuffle上下文信息
ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
// 运行Shuffle
rIter = shuffleConsumerPlugin.run();

①启动EventFetcher线程,不断从MRAppMaster获取Map任务完成的事件。以此来感知哪些Map任务完成了,也就是哪些Map阶段结果数据可以拉取了。

// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher = 
      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
          maxEventsToFetch);
eventFetcher.start();
protected int getMapCompletionEvents()
    throws IOException, InterruptedException {
  
  int numNewMaps = 0;
  TaskCompletionEvent events[] = null;

  do {
    // 通过RPC请求MRAppMaster获取Map完成事件。
    MapTaskCompletionEventsUpdate update =
        umbilical.getMapCompletionEvents(
            (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
            fromEventIdx,
            maxEventsToFetch,
            (org.apache.hadoop.mapred.TaskAttemptID)reduce);
    events = update.getMapTaskCompletionEvents();
    LOG.debug("Got " + events.length + " map completion events from " +
             fromEventIdx);

    assert !update.shouldReset() : "Unexpected legacy state";

    fromEventIdx += events.length;
    // 将获取到的TaskCompletionEvent事件交给ShuffleScheduler来处理
    for (TaskCompletionEvent event : events) {
      scheduler.resolve(event);
      if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {
        ++numNewMaps;
      }
    }
  } while (events.length == maxEventsToFetch);

  return numNewMaps;
}

将获取到的Map任务完成事件交给ShuffleScheduler处理,ShuffleScheduler对应的实例为ShuffleSchedulerImpl。状态为SUCCESSED的Map任务被封装成MapHost对象,放入Set<MapHost> pendingHosts 中,以备后续处理。

public void resolve(TaskCompletionEvent event) {
  switch (event.getTaskStatus()) {
  // 事件状态是成功的,代表Map任务顺利完成,将其封装成MapHost对象放入Set<MapHost> pendingHosts = new HashSet<MapHost>()中,以备后续处理。
  case SUCCEEDED:
    URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
    addKnownMapOutput(u.getHost() + ":" + u.getPort(),
        u.toString(),
        event.getTaskAttemptId());
    maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
    break;
  case FAILED:
  case KILLED:
  case OBSOLETE:
    obsoleteMapOutput(event.getTaskAttemptId());
    LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
        " map-task: '" + event.getTaskAttemptId() + "'");
    break;
  case TIPFAILED:
    tipFailed(event.getTaskAttemptId().getTaskID());
    LOG.info("Ignoring output of failed map TIP: '" +
        event.getTaskAttemptId() + "'");
    break;
  }
}

②启动一批Fetcher线程(可配置,默认5个),从ShuffleScheduler获取Map数据的URL,并发拉取Map结果数据,拉取的方式是HTTP请求。

final int numFetchers = isLocal ? 1 :
      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
  fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
      merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
      localMapFiles);
  fetchers[0].start();
} else {
  for (int i=0; i < numFetchers; ++i) {
    fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                   reporter, metrics, this, 
                                   reduceTask.getShuffleSecret());
    fetchers[i].start();
  }
}
public void run() {
  try {
    while (!stopped && !Thread.currentThread().isInterrupted()) {
      MapHost host = null;
      try {
        // If merge is on, block
        merger.waitForResource();

        // Get a host to shuffle from
        host = scheduler.getHost();
        metrics.threadBusy();

        // Shuffle
        copyFromHost(host);
      } finally {
        if (host != null) {
          scheduler.freeHost(host);
          metrics.threadFree();            
        }
      }
    }
  } catch (InterruptedException ie) {
    return;
  } catch (Throwable t) {
    exceptionReporter.reportException(t);
  }
}

将拉取过来的数据封装成MapOutput对象,MapOutput有两个实现,分别为InMemoryMapOutput & OnDiskMapOutput,从名字可以看出来,拉取到的数据可以放到内存里,也可以放到磁盘里。mapreduce.reduce.shuffle.memory.limit.percent这个参数控制了,有多少内存可以用来存放Map结果数据,默认0.25,即25%堆内存。超过25%以后,拉取的Map结果数据一律放在磁盘上。

public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
                                             long requestedSize,
                                             int fetcher
                                             ) throws IOException {
  if (requestedSize > maxSingleShuffleLimit) {
    LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
             " is greater than maxSingleShuffleLimit (" + 
             maxSingleShuffleLimit + ")");
    return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
       fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
       mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
  }
  
  if (usedMemory > memoryLimit) {
    LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
        + ") is greater than memoryLimit (" + memoryLimit + ")." + 
        " CommitMemory is (" + commitMemory + ")"); 
    return null;
  }
  
  // Allow the in-memory shuffle to progress
  LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
      + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
      + "CommitMemory is (" + commitMemory + ")"); 
  return unconditionalReserve(mapId, requestedSize, true);
}

③在拉取Map结果数据的同时,会有两个线程同时执行合并操作,InMemoryMerger线程负责合并内存中的Map结果数据,OnDiskMerger线程负责合并磁盘上的Map结果数据。生成的MapOutput会交给InMemoryMergerOnDiskMerger线程进行合并处理。

④当所有的Map结果数据拉取完毕、内存中的Map结果数据全部合并排序完毕、磁盘中的Map结果数据全部合并排序完毕时会进行最终合并,即将内存和磁盘中的Map结果数据合并。

// stop the scheduler
scheduler.close();

copyPhase.complete(); 
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);

// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
  kvIter = merger.close();
} catch (Throwable e) {
  throw new ShuffleError("Error while doing final merge " , e);
}
public RawKeyValueIterator close() throws Throwable {
    // Wait for on-going merges to complete
    if (memToMemMerger != null) { 
      memToMemMerger.close();
    }
    // 等待内存中的Map结果数据全部合并完毕
    inMemoryMerger.close();
    // 等待磁盘中的Map结果数据全部合并完毕
    onDiskMerger.close();
    
    List<InMemoryMapOutput<K, V>> memory = 
      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
    inMemoryMergedMapOutputs.clear();
    memory.addAll(inMemoryMapOutputs);
    inMemoryMapOutputs.clear();
    List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
    onDiskMapOutputs.clear();
     // 内存和磁盘中的Map结果数据合并
    return finalMerge(jobConf, rfs, memory, disk);
}

如何进行排序的呢?主要依靠org.apache.hadoop.mapred.Merger.MergeQueue类MergeQueue继承了优先队列,实现了迭代器接口。MergeQueue中的元素是以Segment为单位,Segment是内部已经排序完成的数据段。Segment中的第一个元素的大小代表这个Segment的大小,以此来进行优先队列的调整。当通过迭代方法从MergeQueue获取元素是,必定是当前最小(最大)的元素。这样就实现了排序的效果。

 private static class MergeQueue<K extends Object, V extends Object> 
  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator

至此,Shuffle后半段结束。
5、开始执行Reduce逻辑。Reduce结果直接输出到磁盘

if (useNewApi) {
  runNewReducer(job, umbilical, reporter, rIter, comparator, 
                keyClass, valueClass);
} else {
  runOldReducer(job, umbilical, reporter, rIter, comparator, 
                keyClass, valueClass);
}

6、完成以上操作后,Reduce进程向MRAppMaster进程汇报任务结束,然后退出进程。

相关文章

  • MapReduce(六):Reduce阶段

    概述 基于Hadoop 2.x Reduce阶段什么时候开始? 默认情况下,当Map任务完成个数达到Map任务总数...

  • 01 - 关于 MapReduce

    1. map 和 reduce MapReduce 任务过程分为两个处理阶段:map阶段和reduce阶段。每阶段...

  • 简单搞定Shuffle机制运行原理

    概述 1)mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的...

  • MapReduce工作流程

    MapReduce如何工作: MapReduce程序的工作分两个阶段进行:Map阶段Reduce 阶段输入到每一个...

  • MapReduce执行过程及shuffle详解

    开始学习Hadoop时,经常会听到MapReduce。MapReduce由Map和Reduce两个阶段,每个阶段都...

  • shuffle阶段

    1.概述 mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的...

  • MapReduce 任务执行过程

    MapReduce的执行过程主要包含是三个阶段:Map阶段、Shuffle阶段、Reduce阶段 Map 阶段 s...

  • Hive 任务卡在 map = 0%, reduce = 0%

    Hive 卡在map = 0%, reduce = 0%阶段 解决:增加map个数,设置mapreduce.in...

  • Mapreduce运行步骤

    MapReduce的工作过程主要分为两个阶段:map阶段和reduce阶段。每个阶段都是键值对最为输入和输出...

  • 编写MapReduce程序

    MapReduce阶段将整个运行过程分为两个阶段,Map阶段和Reduce阶段。 Map阶段由一定数量的Map T...

网友评论

    本文标题:MapReduce(六):Reduce阶段

    本文链接:https://www.haomeiwen.com/subject/fuouyqtx.html