Spark中的cache和persist
2018-02-08 11:51
495 查看
Spark中cache和persist的作用以及存储级别
前言
Spark开发高性能的大数据计算作业并不是那么简单。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。有一些代码开发基本的原则,避免创建重复的RDD,尽可能复用同一个RDD,如下,我们可以直接用一个RDD进行多种操作。
val rdd1 = sc.textFile("xxx") rdd1.xxxxx.xxxx.collect rdd1.xxx.xxcollect
但是Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。对于上面的代码,
sc.textFile("xxx")会执行两次,这种方式的性能是很差的。
因此对于这种情况,我的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。
持久化
如果要对一个RDD进行持久化,只要对这个RDD调用cache()和
persist()即可。
val rdd1 = sc.textFile("xxx").cache rdd1.xxxxx.xxxx.collect rdd1.xxx.xxcollect
cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
此时再对rdd1执行两次算子操作时,只有在第一次算子时,才会将这个rdd1从源头处计算一次。第二次执行算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
def cache(): this.type = persist()
persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
默认缓存级别是
StorageLevel.MEMORY_ONLY,也就是cache就是这个默认级别的
RDD的缓存级别
持久化级别 | 含义解释 |
---|---|
MEMORY_ONLY | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。 |
MEMORY_AND_DISK | 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 |
MEMORY_ONLY_SER | 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
MEMORY_AND_DISK_SER | 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
DISK_ONLY | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等 | 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。 |
scala> var data = sc.parallelize(List(1,2,3,4)) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at console:12 scala> data.getStorageLevel res65: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1) scala> data.cache res66: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at console:12 scala> data.getStorageLevel res67: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)
查看 StorageLevel 类的源码:
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... }
每个缓存级别后面都跟了一个StorageLevel的构造器,里面包含了4个或5个参数。我们查看一下StorageLevel的构造器源码。
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... }
useDisk:使用硬盘(外存)
useMemory:使用内存
useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)。
持久化策略的选择
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。
Spark Cache 性能测试
作者介绍:涂小刚,攻城狮一枚,有代码洁癖,热爱学习,热爱生活,痛恨酱油帝、跪舔帝。目前主要从事Spark大数据平台与机器学习平台相关方向的工作,关注Spark与TensorFlow测试准备
训练数据是通过 Facebook SNS 公开数据集生成器得到,在HDFS上大小为9.3G,100个文件,添加如下两个参数以保证所有资源全部到位后才启动task,训练时间为加载数据到训练完毕这期间的耗时。--conf spark.scheduler.minRegisteredResourcesRatio=1
--conf
spark.scheduler.maxRegisteredResourcesWaitingTime=10000000000
测试集群为3个节点的TS5机器搭建而成,其中一台作为RM,两台NM。除以上配置外,其他配置全部保持Spark默认状态。公共资源配置、分区设置以及算法参数如下表所示,
executor_memory视不同的测试用例不同:
测试用例
在不使用Cache的情况和使用Cache的情况下,分别测试Spark-Kmeans算法的训练时间以及GC时间占比,相关测试指标数据如下表所示:
从以上测试数据看来,让人有点出乎意料,一开始有点不太相信,但是多次测试后数据并没有多大的抖动,所以说Spark的性能受多方面因素的影响,单单Cache这块不同的Cache方式以及不同的资源情况下,其性能差别就相差较大,下面分析其内在原因。
不使用cache时,GC不是瓶颈,在每次迭代时均要读一遍HDFS,访问HDFS有较大的开销。从HDFS加载训练数据后直接采用Spark原生的Cache:
当
executor_memory为2g时,不足以Cache住原始训练数据,从UI上看到Cache的比例只有33%左右,导致频繁的
rdd-block剔除重建,同时由于内存吃紧,可能引发较重的GC,从UI上看到GC时间占到总的task运行时间的12%左右,已经成为瓶颈,其整体性能还不如不使用Cache;
当
executor_memory为4g时,也不足以Cache住原始训练数据,但是其Cache的比例有90%左右,同样存在
rdd-block剔除重建,并引发较重的GC,GC时间占总的task运行时间的7%左右,虽然比
executor_memory为2g的情况有所好转,但是仍然不理想,只比不做Cache好7%左右,但是内存却多用了20g,并不是特别划算;
当
executor_memory为6g时,可以全部Cache住原始训练数据,性能较优,GC占比较小,但是比不用Cache要多用40g内存,有些代价。
一般来说,当我们内存不够时,可以选择
MEMORY_AND_DISK的缓存方式,但是测试发现
MEMORY_AND_DISK的缓存效果并不是特别好,从测试数据来看,还不如直接使用
DISK_ONLY的缓存方式,
MEMORY_AND_DISK的缓存方式带来的GC开销非常大,可能是因为每次都尽可能地Cache数据到内存,不够再Spill到磁盘,同时引发频繁GC。
交叉验证测试
为了排除偶然性,拿 BigDataBenchmark 中的 PageRank 算法进行测试,分别测试各种Cache方式下整体性能,在保证每种Cache方式下都能100%Cache住数据的情况下,得到如下测试结果。总结
Spark的Cache并不是总是会加速任务运行,Cache的方式不同,对任务产生的影响不同。并不是能用内存Cache就用内存,而是要考虑是否有充足的内存Cache住你的数据,否则可能适得其反。在内存充足时,优先考虑使用MEMORY_ONLY,但是当内存不足以Cache住你的中间数据时,建议直接用
MEMORY_ONLY_SER(spark.rdd.compress=true)或
DISK_ONLY而不要用
MEMORY_AND_DISK,MEMORY_AND_DISK可能会频繁地触发Spark的内存管理,增加Spill以及GC的开销。
相关文章推荐
- Spark中persist和cache的区别
- Spark源码之persist方法,cache方法以及StorageLevel
- 每次进步一点点——spark中cache和persist的区别
- Spark中cache和persist的作用以及存储级别
- spark实战之RDD的cache或persist操作不会触发transformation计算
- 2017.06.15--spark中cache和persist的区别
- 【总结】论spark中的cache/persist/checkpoint
- spark中的map和flattop,persist和cache分别有什么区别?
- Spark RDD中cache和persist的区别
- spark中cache和persist的区别,rdd缓存源码解析
- spark中的cache() persist() checkpoint()之间的区别
- spark中的cache() persist() checkpoint()之间的区别
- Spark中cache和persist的区别
- Spark的cache和persist
- Spark RDD中cache和persist的区别
- 【Spark系列5】cache和persist的区别
- spark中的cache() persist() checkpoint()之间的区别
- Spark RDD的缓存 rdd.cache() 和 rdd.persist()
- spark源码之RDD(1)partition、dependence、persist
- Spark storage系列------1.Spark RDD.persist对数据的存储