Spark Sort-Based Shuffle内幕彻底解密
2016-02-01 07:53
363 查看
Spark Sort-Based Shuffle内幕彻底解密
本期内容:
1 为什么使用Sort-Based Shuffle
2 Sort-Based Shuffle 实战
3 Sort-Based Shuffle 内幕
4 Sort-Based Shuffle的不足
一、为什么需要Sort Based Shuffle?
1、Shuffle一般包含两阶段任务;第一部分,产生Shuffle数据的阶段(map阶段)getWriter,需要实现ShuffleManager中getWriter来写数据(数据 可以BlockManager写到Memory、DIsk、Tachyon等,例如像非常快的Shuffle,此时可以考虑把数据写到内存中,但是内存不稳定,建议使用 MEMORY_AND_DISK方式),第二阶段,使用Shuffle数据的阶段(Reduce阶段) ,额外补充,需要实现ShuffleManager的getReader;
2、如果只有一个Stage,则这个Job就是相当于只有一个Mapper阶段, 当然不会产生Shuffle,适合于简单的ETL;在Spark的链条上有很多的Stage,最后一个Stage产生阶段,最后一个Stage是倒数第二个Stage的Reducer(除了单Stage的job),最后一个Stage是Reducer,第一个 Stage一定是Mapper,中间的Stage既是Mapper也是Reducer,即第n个Stage是第n-1个Stage的Reducer,也是第n+1个Stage的Mapper。
3、Spark Shuffle在最开始的时候只支持 Hash-Based Shuffle;默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task 中要使用的数据,但是在一些情况下(例如数据量非常大的情况)会造成大量的(M*R,之中M代表Mapper中所有的并行任务数量,R代表Reducer中所有的并行任务数据)随机磁盘I/O操作且会造成大量的MEM消耗(极易造成OOM),这是致命的,因为:
第一:不能够处理大规模的数据;
第二:Spark不能够运行在大规模的分布式集群上!后来的改善方式是加入Shuffle Consalidate机制来讲Shuffle时候产生的文件实例减少到 C*R个(C代表在Mapper端同时能够使用的Cores的数量,R代表Reducer中所有的
并行任务数量 ),但是此时如果Reducer端的并行数据分片过多的话则C*R可能已经过大,此时依旧没有逃脱文件打开过多的厄运!
说明:Spark在引入Sort-Based Shuffle以前,比较适用于中小规模的大数据处理(Spark1.1X)
4、为了让Spark在更大规模的集群上更高性能处理更大规模的数据,于是就引入了Sort-Based Shuffle!从此以后(Spark1.1.X版本开始), Spark可以胜任任意规模(包含PB以及以上级别的)的更大数据的处理,尤其是随着钨丝计划的引入和优化,把Spark更快的扎起更大的集群处理更加海量的数据的能力推向了一个新的巅峰!
5、Spark1.6版本支持至少三种类型的Shuffle(可以自定义,Shuffle可插拔等),实现ShuffleManager接口可以根据自己的需要更优化的自定义的Shuffle的实现。
上述的源码说明,你可以在Spark的配置文件中配置Spark框架运行时要使用的具体的ShuffleManager的实现。
修改conf/spark-default.conf加入如下内容:spark.shuffle.namager SORT
Sort-based Shuffle不会为每个Reducer中的Task生成一个单独的文件,相反,Sort-based Shuffle会把Mapper中每个ShuffleMapTask所以的输出数据Data只写到一个文件中。因为每个ShuffleMapTask中的数据会被分类,所以Sort-based
Shuffle使用了Index文件存储具体ShuffleMapTask输出数据在同一个data是如何分类的信息!因此基于Sort-based 的Shuffle会在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index,其中Data文件是存储当前Task的Shuffle输出的,而Index文件中存储了data文件中的数据通过partitioner的分类索引,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的;
Sort-based Shuffle会产生2*M个文件(M代表了Mapper阶段中并行的Partition的总数量,其实就是Mapper端ShuffleMapTask的总数量)个Shuffle临时文件。
回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化以此为:
Basic Hash Shuffle:M*R;
Consalidate 方式的Hash Shuffle:C*R;
Sort-based Shuffle:2*M;
二、在集群中动手实战Sort-based Shuffle
1、启动Hadoop HDFS
上传三个文件:
Shuffle运行生成的中间的文件(共8个文件,data4个,index4个):
命名规则:第一个1代表Stage,第二个1代表ShuffleMapTask ID,第三个0代表Reducer的ID,data代表数据本身
由此证明了在默认的Sort-based Shuffle的产生2M个文件,其中M为Task的数量。
在Sort-based Shuffle中Reducer使如何获取自己需要的数据的呢?
具体而言,Reducer首先找Driver去获取父Stage(等父Stage执行完后子Stage边抓数据边计算)中每个ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index文件,从解析的index文件中获取Data文件中属于自己的那部分内容;
hash的shuffle文件数跟下个stage的并行数相关,等于M*R,所以hash的shuffle、下个stage并行度不有shuffle决定,反观sort,shuffle决定下个satge的并行数,两者对下个stage的并行数影响不同!
一般merge 排序的时候为了减低OOM,会同时打开上百文件
三、Sort-based Shuffle确定
1、如果Mapper中Task的数据量过大,依旧会产生很多小文件;此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时打开大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至奔溃!
2、如果在分片内也需要排序的话,此时也需要进行Mapper端和Reducer端的两次排序!
本期内容:
1 为什么使用Sort-Based Shuffle
2 Sort-Based Shuffle 实战
3 Sort-Based Shuffle 内幕
4 Sort-Based Shuffle的不足
一、为什么需要Sort Based Shuffle?
1、Shuffle一般包含两阶段任务;第一部分,产生Shuffle数据的阶段(map阶段)getWriter,需要实现ShuffleManager中getWriter来写数据(数据 可以BlockManager写到Memory、DIsk、Tachyon等,例如像非常快的Shuffle,此时可以考虑把数据写到内存中,但是内存不稳定,建议使用 MEMORY_AND_DISK方式),第二阶段,使用Shuffle数据的阶段(Reduce阶段) ,额外补充,需要实现ShuffleManager的getReader;
/** * Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle /** Get a writer for a given partition. Called on executors by map tasks. */ def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). * Called on executors by reduce tasks. */ def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C]
2、如果只有一个Stage,则这个Job就是相当于只有一个Mapper阶段, 当然不会产生Shuffle,适合于简单的ETL;在Spark的链条上有很多的Stage,最后一个Stage产生阶段,最后一个Stage是倒数第二个Stage的Reducer(除了单Stage的job),最后一个Stage是Reducer,第一个 Stage一定是Mapper,中间的Stage既是Mapper也是Reducer,即第n个Stage是第n-1个Stage的Reducer,也是第n+1个Stage的Mapper。
3、Spark Shuffle在最开始的时候只支持 Hash-Based Shuffle;默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task 中要使用的数据,但是在一些情况下(例如数据量非常大的情况)会造成大量的(M*R,之中M代表Mapper中所有的并行任务数量,R代表Reducer中所有的并行任务数据)随机磁盘I/O操作且会造成大量的MEM消耗(极易造成OOM),这是致命的,因为:
第一:不能够处理大规模的数据;
第二:Spark不能够运行在大规模的分布式集群上!后来的改善方式是加入Shuffle Consalidate机制来讲Shuffle时候产生的文件实例减少到 C*R个(C代表在Mapper端同时能够使用的Cores的数量,R代表Reducer中所有的
并行任务数量 ),但是此时如果Reducer端的并行数据分片过多的话则C*R可能已经过大,此时依旧没有逃脱文件打开过多的厄运!
说明:Spark在引入Sort-Based Shuffle以前,比较适用于中小规模的大数据处理(Spark1.1X)
4、为了让Spark在更大规模的集群上更高性能处理更大规模的数据,于是就引入了Sort-Based Shuffle!从此以后(Spark1.1.X版本开始), Spark可以胜任任意规模(包含PB以及以上级别的)的更大数据的处理,尤其是随着钨丝计划的引入和优化,把Spark更快的扎起更大的集群处理更加海量的数据的能力推向了一个新的巅峰!
5、Spark1.6版本支持至少三种类型的Shuffle(可以自定义,Shuffle可插拔等),实现ShuffleManager接口可以根据自己的需要更优化的自定义的Shuffle的实现。
// Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)6、Spark1.6默认采用的就是Sort-based Shuffle的方式:
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
上述的源码说明,你可以在Spark的配置文件中配置Spark框架运行时要使用的具体的ShuffleManager的实现。
修改conf/spark-default.conf加入如下内容:spark.shuffle.namager SORT
Sort-based Shuffle不会为每个Reducer中的Task生成一个单独的文件,相反,Sort-based Shuffle会把Mapper中每个ShuffleMapTask所以的输出数据Data只写到一个文件中。因为每个ShuffleMapTask中的数据会被分类,所以Sort-based
Shuffle使用了Index文件存储具体ShuffleMapTask输出数据在同一个data是如何分类的信息!因此基于Sort-based 的Shuffle会在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index,其中Data文件是存储当前Task的Shuffle输出的,而Index文件中存储了data文件中的数据通过partitioner的分类索引,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的;
Sort-based Shuffle会产生2*M个文件(M代表了Mapper阶段中并行的Partition的总数量,其实就是Mapper端ShuffleMapTask的总数量)个Shuffle临时文件。
回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化以此为:
Basic Hash Shuffle:M*R;
Consalidate 方式的Hash Shuffle:C*R;
Sort-based Shuffle:2*M;
二、在集群中动手实战Sort-based Shuffle
1、启动Hadoop HDFS
root@Master:/usr/local/hadoop/hadoop-2.6.0/sbin# ./start-dfs.sh2、创建目录并上传文件
root@Master:/usr/local/hadoop/hadoop-2.6.0/sbin# hadoop dfs -mkdir /library/dataForSort
上传三个文件:
root@Master:/usr/local/hadoop/hadoop-2.6.0# hadoop dfs -put README.txt /library/dataForSort DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. root@Master:/usr/local/hadoop/hadoop-2.6.0# hadoop dfs -put LICENSE.txt /library/dataForSort DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. root@Master:/usr/local/hadoop/hadoop-2.6.0# hadoop dfs -put NOTICE.txt /library/dataForSort DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. root@Master:/usr/local/hadoop/hadoop-2.6.0#3、启动Spark:
start-all.sh stop-mesos-dispatcher.sh start-history-server.sh stop-mesos-shuffle-service.sh start-master.sh stop-shuffle-service.sh start-mesos-dispatcher.sh stop-slave.sh start-mesos-shuffle-service.sh stop-slaves.sh start-shuffle-service.sh stop-thriftserver.sh root@Master:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/sbin# ./start-all.sh4、运行spark-shell
root@Master:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin# ./spark-shell注:不指定Master,说明在local模式中运行(因为local默认在我们测试的角度讲,比较容易观察数据)
scala>sc.textFile("/library/dataForSort").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_).count
Shuffle运行生成的中间的文件(共8个文件,data4个,index4个):
shuffle_0_0_0.data Shuffle_0_1_0.data Shuffle_0_2_0.data Shuffle_0_3_0.data Shuffle_0_0_0.index Shuffle_0_1_0.index Shuffle_0_2_0.index Shuffle_0_3_0.index
命名规则:第一个1代表Stage,第二个1代表ShuffleMapTask ID,第三个0代表Reducer的ID,data代表数据本身
由此证明了在默认的Sort-based Shuffle的产生2M个文件,其中M为Task的数量。
在Sort-based Shuffle中Reducer使如何获取自己需要的数据的呢?
具体而言,Reducer首先找Driver去获取父Stage(等父Stage执行完后子Stage边抓数据边计算)中每个ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index文件,从解析的index文件中获取Data文件中属于自己的那部分内容;
hash的shuffle文件数跟下个stage的并行数相关,等于M*R,所以hash的shuffle、下个stage并行度不有shuffle决定,反观sort,shuffle决定下个satge的并行数,两者对下个stage的并行数影响不同!
一般merge 排序的时候为了减低OOM,会同时打开上百文件
三、Sort-based Shuffle确定
1、如果Mapper中Task的数据量过大,依旧会产生很多小文件;此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时打开大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至奔溃!
2、如果在分片内也需要排序的话,此时也需要进行Mapper端和Reducer端的两次排序!
相关文章推荐
- day02知识点回顾
- Lintcode: Segment Tree Query
- Linux/Unix 桌面趣事:终端上的圣诞树
- Hash-based Shuffle内幕彻底解密
- Heka–>Elasticsearch 索引数据过程的优化
- 继续node爬虫 — 百行代码自制自动AC机器人日解千题攻占HDOJ
- Leetcode: Verify Preorder Serialization of a Binary Tree
- Leetcode 53 - Maximum Subarray
- 什么样的程序员适合去创业公司
- 【blog算法原理】RANSAC和Flitline
- 【blog算法原理】Opencv中直线的表示方法
- [项目实战派]花40分钟写一个-CBIR引擎-代码公开
- 开始使用Google Analytics 示例
- Rails 使用 Google Analytics 示例
- 《乱码传奇人生-程序媛学习之路》(2)
- 有用的 Google Analytics Chrome 插件推荐
- Lintcode: Segment Tree Build
- 基于Eclipse的Android开发环境搭建及初步使用教程
- 使用DreamHost当GoDaddy域名的服务器 步骤
- LeetCode "Patching Array" !!!