[Spark源码浅析]-关于sort-based shuffle
2017-07-17 23:56
453 查看
在sort-based shuffle,记录根据目标分区id做排序,然后写到一份单一的输出文件中。 Reducers拉取这个文件连续的region 来读取这个输出的位置。
如果这次map的输出数据 太大到不能放在内存中。这个排序输出 会被切分到磁盘。然后被合并到一个最终的文件中。
sort-based shuffle 有两个不同的输出路径 来存在输出文件。
一、序列化的排序(满足以下三个条件时)
1.shuffle依赖没有聚合或者排序输出。
2.shuffle序列化序列化值再布置。(这个已经被KryoSerializer和spark sql 传统序列化支持)
3.shuffle输出产生少于16777216个输出分区。
二、非序列化排序。处理所有类型。
相关类:
SortShuffleWriter
UnsafeShuffleWriter
看这个类:
IndexShuffleBlockResolver
创建和维持这个shuffle block的 逻辑block 和 物理文件地址的mapping。
来自于相同的map任务的shuffle 的块的数据 被存在在一个单一的合并的数据文件中。
这个数据块的偏移地址呗存储在一个分开的索引文件中[ShuffleIndexRecord]。
我们用一个shuffle数据的shuffleBlockId 和 reduce ID 设置为0 然后加上.data作为这个数据文件的文件名。索引文件用.index作为文件名的后缀。
ExternalShuffleBlockResolver
在common的network-shuffle包里。org.apache.spark.network.shuffle
ShuffleBlockFetcherIterator
一个拉取多个block的迭代器。拉取本地block时,它从本地block manager拉取。当拉取远程block时,用提供的BlockTransferService 拉取数据。
这个创建一个tuples(blockID,InputStream)的迭代器。使得呼叫着可以处理一个管道式的接受数据。
这个实现远程拉取的节流。所以他们不会超过最大的字节限制防止使用过多的内存。
如果这次map的输出数据 太大到不能放在内存中。这个排序输出 会被切分到磁盘。然后被合并到一个最终的文件中。
sort-based shuffle 有两个不同的输出路径 来存在输出文件。
一、序列化的排序(满足以下三个条件时)
1.shuffle依赖没有聚合或者排序输出。
2.shuffle序列化序列化值再布置。(这个已经被KryoSerializer和spark sql 传统序列化支持)
3.shuffle输出产生少于16777216个输出分区。
二、非序列化排序。处理所有类型。
相关类:
ShuffleWriter(抽象类)
写一序列化记录到这个任务的输出。 def write(records: Iterator[Product2[K, V]]): Unit 关闭这个输出。这个map输出完成会跳过。 def stop(success: Boolean): Option[MapStatus]
ShuffleWriter 有 三个子类
BypassMergeSortShuffleWriterSortShuffleWriter
UnsafeShuffleWriter
看这个类:
private[spark] class SortShuffleWriter[K, V, C]( shuffleBlockResolver: IndexShuffleBlockResolver, handle: BaseShuffleHandle[K, V, C], mapId: Int, context: TaskContext) extends ShuffleWriter[K, V] with Logging
IndexShuffleBlockResolver
创建和维持这个shuffle block的 逻辑block 和 物理文件地址的mapping。
来自于相同的map任务的shuffle 的块的数据 被存在在一个单一的合并的数据文件中。
这个数据块的偏移地址呗存储在一个分开的索引文件中[ShuffleIndexRecord]。
我们用一个shuffle数据的shuffleBlockId 和 reduce ID 设置为0 然后加上.data作为这个数据文件的文件名。索引文件用.index作为文件名的后缀。
ExternalShuffleBlockResolver
在common的network-shuffle包里。org.apache.spark.network.shuffle
ShuffleBlockFetcherIterator
一个拉取多个block的迭代器。拉取本地block时,它从本地block manager拉取。当拉取远程block时,用提供的BlockTransferService 拉取数据。
这个创建一个tuples(blockID,InputStream)的迭代器。使得呼叫着可以处理一个管道式的接受数据。
这个实现远程拉取的节流。所以他们不会超过最大的字节限制防止使用过多的内存。
相关文章推荐
- Spark源码分析之Sort-Based Shuffle读写流程
- Spark-1.6.0中的Sort Based Shuffle源码解读
- Spark-1.6.0中的Sort Based Shuffle源码解读
- Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现
- Spark源码分析之Sort-Based Shuffle读写流程
- Spark-1.6.0中的Sort Based Shuffle源码解读
- day25:Spark Sort-Based Shuffle内幕工作机制、案例实战、源码剖析、优缺点及改进方式
- sort-based shuffle的核心:org.apache.spark.util.collection.ExternalSorter
- Android源码浅析(五)——关于定制系统,如何给你的Android应用系统签名
- Spark商业案例与性能调优实战100课》第26课:电光石火间从根本上理解Spark中Sort-Based Shuffle产生的内幕及其tungsten-sort 背景解密
- Spark Sort-Based Shuffle内幕彻底解密
- 第26课:电光石火间从根本上理解Spark中Sort-Based Shuffle产生的内幕及其tungsten-sort 背景解密
- [Spark性能调优] 第三章 : Spark 2.1.0 中 Sort-Based Shuffle 产生的内幕
- Spark Sort Based Shuffle源码详细解析----数据流篇----Shuffle Write流
- 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析
- 大数据IMF传奇行动绝密课程第25课:Spark Sort-Based Shuffle内幕彻底解密
- Mybatis源码浅析(一)关于Mybatis的背景了解
- Android源码浅析(五)——关于定制系统,如何给你的Android应用系统签名
- 【Spark Core】任务执行机制和Task源码浅析1
- 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析