spark core 2.0 BypassMergeSortShuffleWriter
2017-01-20 11:40
309 查看
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
/** * This class implements sort-based shuffle's hash-style shuffle fallback path. This write path * writes incoming records to separate files, one file per reduce partition, then concatenates these * per-partition files to form a single output file, regions of which are served to reducers. * Records are not buffered in memory. This is essentially identical to * {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format * that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}. * <p> * This write path is inefficient for shuffles with large numbers of reduce partitions because it * simultaneously opens separate serializers and file streams for all partitions. As a result, * {@link SortShuffleManager} only selects this write path when * <ul> * <li>no Ordering is specified,</li> * <li>no Aggregator is specific, and</li> * <li>the number of partitions is less than * <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li> * </ul> * * This code used to be part of {@link org.apache.spark.util.collection.ExternalSorter} but was * refactored into its own class in order to reduce code complexity; see SPARK-7855 for details. * <p> * There have been proposals to completely remove this code path; see SPARK-6026 for details. */ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BypassMergeSortShuffleWriter( BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, BypassMergeSortShuffleHandle<K, V> handle, int mapId, TaskContext taskContext, SparkConf conf) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; final ShuffleDependency<K, V, V> dep = handle.dependency(); this.mapId = mapId; this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; }
/** Array of file writers, one for each partition */ private DiskBlockObjectWriter[] partitionWriters;
@Override public void write(Iterator<Product2<K, V>> records) throws IOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); partitionWriters = new DiskBlockObjectWriter[numPartitions]; for (int i = 0; i < numPartitions; i++) { final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); while (records.hasNext()) { final Product2<K, V> record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (DiskBlockObjectWriter writer : partitionWriters) { writer.commitAndClose(); } File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); try { partitionLengths = writePartitionedFile(tmp); shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); }
/** * Concatenate all of the per-partition files into a single combined file. * * @return array of lengths, in bytes, of each partition of the file (used by map output tracker). */ private long[] writePartitionedFile(File outputFile) throws IOException { // Track location of the partition starts in the output file final long[] lengths = new long[numPartitions]; if (partitionWriters == null) { // We were passed an empty iterator return lengths; } final FileOutputStream out = new FileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriters[i].fileSegment().file(); if (file.exists()) { final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } threwException = false; } finally { Closeables.close(out, threwException); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; return lengths; }
@Override public Option<MapStatus> stop(boolean success) { if (stopping) { return None$.empty(); } else { stopping = true; if (success) { if (mapStatus == null) { throw new IllegalStateException("Cannot call stop(true) without having called write()"); } return Option.apply(mapStatus); } else { // The map task failed, so delete our output data. if (partitionWriters != null) { try { for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: File file = writer.revertPartialWritesAndClose(); if (!file.delete()) { logger.error("Error while deleting file {}", file.getAbsolutePath()); } } } finally { partitionWriters = null; } } shuffleBlockResolver.removeDataByMap(shuffleId, mapId); return None$.empty(); } } }
相关文章推荐
- spark core 2.0 SortShuffleManager
- MapReduce:详解Shuffle(copy,sort,merge)过程(转)
- MapReduce:详解Shuffle(copy,sort,merge)过程
- spark core 2.0 SerializedShuffleHandle UnsafeShuffleWriter ShuffleExternalSorter
- MapReduce核心map reduce shuffle (spill sort partition merge)详解
- spark core 2.0 DiskBlockObjectWriter
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce:详解Shuffle(copy,sort,merge)过程
- mapreduce里的shuffle 里的 sort merge 和combine
- mapreduce shuffle merge sort
- MapReduce:详解Shuffle(copy,sort,merge)过程
- MapReduce核心map reduce shuffle (spill sort partition merge)详解
- 号外,号外 -几乎所有的binary search和mergesort都有错
- 排序算法详解【归并排序-Merge_Sort】
- Hadoop 之 Shuffle and Sort