美文网首页
8.6 Shuffle过程之 ShuffleWirter

8.6 Shuffle过程之 ShuffleWirter

作者: GongMeng | 来源:发表于2018-12-25 20:02 被阅读0次

1. 概述

根据前面的介绍, ShuffleManager会初始化ShuffleBlockResolver来管理map结果中的block, 并最终通过ShuffleWriter来写入到磁盘/内存上.
根据策略的不同, 有三种主要的实现BypassMergeSortShuffleWriter, SortShuffleWriterUnsafeShuffleWriter

2. BypassMergeSortShuffleWriter

2.1 维护的结构

当map结果的parition数小于spark.shuffle.sort.bypassMergeThreshold的时候回使用这种写策略.

这个类是用Java实现的, 读源码的朋友请去Java那边找源码

  private final int fileBufferSize;
  private final boolean transferToEnabled;
  private final int numPartitions;
  private final BlockManager blockManager;
  private final Partitioner partitioner;
  private final ShuffleWriteMetrics writeMetrics;
  // shuffleManager 需要的shuffleId
  private final int shuffleId;
  // MapTask对应的id
  private final int mapId;
  private final Serializer serializer;

  private final IndexShuffleBlockResolver shuffleBlockResolver;

  /** 每个parition会需要一个writer来把对应的数据刷到硬盘 */
  private DiskBlockObjectWriter[] partitionWriters;
  @Nullable private MapStatus mapStatus;
  // 每个parition的带下, 以bytes计数
  private long[] partitionLengths;

  /**
   * Are we in the process of stopping? Because map tasks can call stop() with success = true
   * and then call stop() with success = false if they get an exception, we want to make sure
   * we don't try deleting files, etc twice.
   */
  private boolean stopping = false;

2.2 方法write 用来执行写逻辑

// 将Map端生成的数据刷到硬盘文件上
public void write(Iterator<Product2<K, V>> records) throws IOException {
    assert (partitionWriters == null);
    if (!records.hasNext()) {
      partitionLengths = new long[numPartitions];
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
        blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      partitionWriters[i] =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
    }
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);

    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    for (DiskBlockObjectWriter writer : partitionWriters) {
      writer.commitAndClose();
    }
    // 生成对应的file
    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    File tmp = Utils.tempFileWith(output);
    try {
      // 把parttion数据下刷到文件
      partitionLengths = writePartitionedFile(tmp);
      // 前面提到的写过程
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }

3. SortShuffleWriter

需要注意的有两点

  • 可能执行一个mapSideCombine过程, 在输出端就把相同key的放在一个array里面进行预先计算, 来增强后续操作的performance.
    是否执行这个操作是看算子是否会把dep.mapSideCombine设置成true
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }

相关文章

网友评论

      本文标题:8.6 Shuffle过程之 ShuffleWirter

      本文链接:https://www.haomeiwen.com/subject/fuhqlqtx.html