Spark源码之persist方法,cache方法以及StorageLevel
2017-07-07 16:58
375 查看
今天看Spark源码的时候看到了persist方法与cache方法,这里就说明一下两者的区别,也解决了自己之前的疑惑。
第一种persist方法:
第二种persist方法:
第三种persist方法:
unpersist方法:
而它设定的12种级别其实就是这些参数取值的组合,看源码:
persist方法有三种,分为默认无参数仅内存级别的persist(),还有persist(newLevel):这个方法需要之前对RDD没有设置过缓存级别,persist(newLevel,allowOverride):这个方法适用于之前对RDD设置过缓存级别,但是想更改缓存级别的情况。
取消缓存统一使用unpersist()方法
StorageLevel设定有12种缓存策略,可以根据自己的情况选择合适的
一般情况下建议使用persist(StorageLevel.MEMORY_AND_DISK)方法替代cache()方法,以防止内存不足造成程序运行错误
cache方法
Spark2.11关于cache方法的源码是这样的:/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). * 缓存RDD,缓存级别为仅在内存中 * 这里的cache方法实际上调用了无参数的persist方法 */ def cache(): this.type = persist()
persist方法
Spark2.11关于persist方法的源码是这样的:第一种persist方法:
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). * 无参数的缓存级别为仅内存 */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
第二种persist方法:
/** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet. Local checkpointing is an exception. * 设置RDD的存储级别为我们传递的newLevel值,这个方法仅仅适用于之前我们没有为RDD的设置过缓存级别,如果之前设置过了,程序会报错,其中如果为RDD设置了checkpoint()除外。 * 这里其实有两个特殊情况: * 第一,如果我们之前设置过RDD的缓存级别,现在再次调用此方法进行缓存级别设置,但是缓存级别与之前一样,程序也是不会报错的,因为里面调用了persist(newLevel, allowOverride = false)方法,具体看一下下面这个方法的逻辑就会清楚了 * 第二种情况是我们之前对RDD调用了checkpoint方法,这个方法是把RDD存储到disk上,之后我们再调用persist(newLevel)方法也是不会报错的,他会做检查你是否执行过checkpoint方法(即isLocallyCheckpointed),如果是的话就会调用persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true),而这里LocalRDDCheckpointData.transformStorageLevel(newLevel)返回的缓存级别是disk级别,故不会报错 */ def persist(newLevel: StorageLevel): this.type = { if (isLocallyCheckpointed) { // This means the user previously called localCheckpoint(), which should have already // marked this RDD for persisting. Here we should override the old storage level with // one that is explicitly requested by the user (after adapting it to use disk). persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) } else { persist(newLevel, allowOverride = false) } }
第三种persist方法:
/** * Mark this RDD for persisting using the specified level. * * @param newLevel the target storage level * @param allowOverride whether to override any existing level with the new one * 这个方法适用于之前我们设置过了RDD的缓存级别,现在想要修改RDD的缓存级别的情况,只需要把allowOverride设置为true */ private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel //这一段程序也解释了上面第二种方法的第一个特殊情况为什么不会报错 if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this }
unpersist方法:
/** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. *这个方法用于取消缓存级别 * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ def unpersist(blocking: Boolean = true): this.type = { logInfo("Removing RDD " + id + " from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this }
StorageLevel类
StorageLevel这个类里面设置了RDD的各种缓存级别,总共有12种,其实是它的多个构造参数的组合形成的,先看一下它的相关构造参数,源码如下:@DeveloperApi class StorageLevel private( private var _useDisk: Boolean, //是否使用磁盘 private var _useMemory: Boolean, //是否使用内存 private var _useOffHeap: Boolean, //是否使用堆外内存 private var _deserialized: Boolean, //是否反序列化 private var _replication: Int = 1) //备份因子,默认为1 extends Externalizable {
而它设定的12种级别其实就是这些参数取值的组合,看源码:
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
总结
cache方法其实是persist方法的一个特例:调用的是无参数的persist(),代表缓存级别是仅内存的情况persist方法有三种,分为默认无参数仅内存级别的persist(),还有persist(newLevel):这个方法需要之前对RDD没有设置过缓存级别,persist(newLevel,allowOverride):这个方法适用于之前对RDD设置过缓存级别,但是想更改缓存级别的情况。
取消缓存统一使用unpersist()方法
StorageLevel设定有12种缓存策略,可以根据自己的情况选择合适的
一般情况下建议使用persist(StorageLevel.MEMORY_AND_DISK)方法替代cache()方法,以防止内存不足造成程序运行错误
相关文章推荐
- Spark中cache和persist的作用以及存储级别
- spark中cache和persist的区别,rdd缓存源码解析
- WinForms C#:html编辑器工程源码,含直接写WebBrowser的文件流、IPersistStreamInit接口的声明和一些相关的小方法
- WinForms C#:html编辑器工程源码,含直接写WebBrowser的文件流、IPersistStreamInit接口的声明和一些相关的小方法
- 用ltib获得源码以及编译嵌入式arm驱动的方法简述
- IOS源码封装成.bundle和.a文件,以及加入xib的具体方法,翻遍网络,仅此一家完美翻译!! IOS7!!(3) 完美结局
- V8 源码 win 系统下编译方法以及 V8 dll 化方法
- WinForms C#:html编辑器工程源码,含直接写WebBrowser的文件流、IPersistStreamInit接口的声明和一些相关的小方法
- Linux/Unix分配进程ID的方法以及源码实现
- Nginx (一)Windows下编译Nginx源码以及安装 nginx for windows方法步骤
- Javascript笔记:jQuery源码分析以及从jQuery对象创建的角度理解extend方法的原理
- PHP:使用Zend对源码加密、Zend Guard安装以及Zend Guard Run-time support missing的解决方法
- GridView导出Excel方法源码以及注意事项
- Asp.net 中图片存储数据库以及页面读取显示通用方法详解-附源码下载
- WinForms C#:html编辑器工程源码,含直接写WebBrowser的文件流、IPersistStreamInit接口的声明和一些相关的小方法
- Android4.0源码编译方法以及错误解决方案
- java源码动态生成编译,以及方法调用
- 关于修改frameworks的源码遇到的一点问题以及解决方法
- eclipse导入android源码的方法以及遇到的问题
- Android4.0源码编译方法以及错误解决方案