您的位置:首页 > 其它

“戏”说Spark-Spark核心-RDD 持久化机制详解

2017-11-29 17:06 591 查看
“戏”说Spark-Spark核心-RDD 持久化机制详解

简介

我们知道RDD之间的血统关系可以使得RDD拥有很好的容错,RDD还有一个叫持久化的机制,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。在Spark应用程序的调优中就会考虑到RDD的持久化的机制。

RDD持久化机制

Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存
4000
的partition,这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD

巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

RDD持久化机制的使用场景

情景1:

一种情况是从一个RDD到几个不同的RDD,算子和计算逻辑其实是完全一样的,结果因为人为的疏忽计算了多次,获取到了多个RDD。所以尽量去复用RDD,差不多的RDD可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。



情景2:

当第一次对RDD2执行算子,获取RDD3的时候,就会从RDD1开始计算,就是读取HDFS文件,然后对RDD1执行算子,获取到RDD2,然后再计算得到RDD3

默认情况下,多次对一个RDD执行算子,去获取不同的RDD都会对这个RDD以及之前的父RDD全部重新计算一次,计算过程为读取HDFS->RDD1->RDD2->RDD4。这种情况,是绝对绝对一定要避免的,一旦出现一个RDD重复计算的情况,就会导致性能急剧降低。比如,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟。

所以对于要多次计算和使用的公共RDD,一定要进行持久化。持久化是将RDD的数据通过BlockManager缓存到内存中/磁盘中,以后无论对这个RDD做多少次计算,都是直接取这个RDD的持久化的数据

持久化前:



持久化后:



补充:频繁的动态更新RDD Cache数据,不适合使用Spark Cache、Spark lineage

在本地测试一下cache缓存性能:

代码:

package spark.myspark.functions

import org.apache.spark.{SparkContext, SparkConf}

/**

* RDD的持久化测试

* RDD的持久化方式:

* 1:cache

* 2:persist

*注意:

*1. cache和persist算子都是懒执行的,必须要有一个Action类的算子来触发执行

*2.cache和persist算子的返回值必须必须赋值给一个变量,在下一个的job中直接使用这个变量就是直接使用持久化的数据了

*3.cache和persist不能紧跟Action类的算子

* 错误:val long=RDD.cache().count()

* 正确:val cacheRDD=RDD.cache()

* val long=cacheRDD.count()

*4:cache=persist()=persit(MEMORY_ONLY)

*/

object CacheAndPresist_test {

def main(args: Array[String]) {

val conf=new SparkConf().setAppName("cacheTest").setMaster("local")

val sc=new SparkContext(conf)

var textRDD=sc.textFile("cacheTest.txt")

val startTime=System.currentTimeMillis()

val count=textRDD.count()

println("not cache count="+count)

val endTime=System.currentTimeMillis()

println("not cache during:"+(endTime-startTime))

val cacheStartTime=System.currentTimeMillis()

val cache= textRDD.cache()

val cachecount=cache.count()

println("cachecount="+cachecount)

val cacheEndTime=System.currentTimeMillis()

println("cache during:"+(cacheEndTime-cacheStartTime))

sc.stop();

}

}

补充:

Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用RDD.unpersist()方法,但是一般不常使用,因为Spark 具有一个叫做ttl的机制会自动的清除最近不常用的数据

RDD持久化源码级别讲解

好,我们现在从源码上看看,RDD的持久化的级别

打开源码,在Core下面找到rdd,然后找到RDD的类:



从源码中我们可以知道:persist()=cache()=persist(StorageLevel.MEMORY_ONLY)

进到StorageLevel类中:

构造方法:



解释:

 StorageLevel有五个属性分别是:

    private var _useDisk: Boolean, //useDisk_是否使用磁盘    private var _useMemory: Boolean, //useMemory_是否使用内存    private var _useOffHeap: Boolean, //useOffHeap_是否使用堆外内存如:整合Tachyon时才会考虑,    private var _deserialized: Boolean,//deserialized_是否进行反序列化    private var _replication: Int = 1) //replication_备份数目。

再看StorageLevel的半生对象(相当于静态工具类):



补充:上面"_2"代表的是份数,就是把持久化的数据存为2份

解释:

1>MEMORY_ONLY:以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。

2>MEMORY_AND_DISK:同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。

3>MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。

4>MEMORY_AND_DSK_SER:同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。

5>DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。

6>MEMORY_ONLY_2或者MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

图形解释总结:



注意:持久化的单位为Partition

注意:当使用RDD的MEMORY_ONLY进行持久化的时候,当内存空间不够的时候,不会报OOM,它会选择最小的partiton来持久化在内存,当重新的使用RDD时候,其他的partition会根据依赖关系重新计算

补充:cache(),persist,checkPoint()一般也称为控制类算子

注意:持久化的级别StorageLevel可以自定义,但是
d644
一般不自定义

补充:更多的源码分析请参考:

Spark RDD缓存代码分析https://www.iteblog.com/archives/1532.html

如何选择缓存机制级别呢?

选择持久化级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

       1:如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。

       2:如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。

       3:除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。

       4:如果你希望更快的错误恢复,可以利用重复存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。

5:在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:

它运行多个执行者共享Tachyon中相同的内存池

它显著地减少垃圾回收的花费

如果单个的执行者崩溃,缓存的数据不会丢失



补充:

Spark也会自动持久化一些shuffle操作(如reduceByKey)中的中间数据,即使用户没有调用persist方法。这样的好处是避免了在shuffle出错情况下,需要重复计算整个输入。如果用户计划重用计算过程中产生的RDD,我们仍然推荐用户调用persist方法。

补充: 

注意只能设置一种:不然会抛异常: Cannot change storage level of an RDD after it was already assigned a level

 看源代码:



补充:我们能够在Spark web UI上看到缓存机制的设置级别



checkPoint详解

场景:

当业务场景非常的复杂的时候,RDD的lineage(血统)依赖会非常的长,一旦血统较后面的RDD数据丢失的时候,Spark会根据血统依赖重新的计算丢失的RDD,这样会造成计算的时间过长,Spark提供了一个叫checkPoint的算子来解决这样的业务场景。

使用:

为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。



checkPoint的优点:

1:持久化在HDFS上,HDFS默认的3副本备份使得持久化的备份数据更加的安全

2:切断RDD的依赖关系:当业务场景复杂的时候,RDD的依赖关系非常的长的时候,当靠后的RDD数据丢失的时候,会经历较长的重新计算的过程,采用checkPoint会转为依赖checkPointRDD,可以避免长的lineage重新计算。

checkPoint的原理:

1:当finalRDD执行Action类算子计算job任务的时候,Spark会从finalRDD从后往前回溯查看哪些RDD使用了checkPoint算子

2:将使用了checkPoint的算子标记

3:Spark会自动的启动一个job来重新计算标记了的RDD,并将计算的结果存入HDFS,然后切断RDD的依赖关系

优化:

源码建议:



源码中RDD里的checkpoint()方法的注释,里面建议在执行checkpoint()方法之前先对rdd进行persisted操作。

在checkPoint的RDD之前先cache RDD,那么Spark就不用启动一个job来计算checkPoint的RDD,而是将持久化到内存的数据直接拷贝到HDFS,进而提高Spark的计算速度,提高应用程序的性能

测试代码:

package spark.myspark.functions

import org.apache.spark.{SparkContext, SparkConf}

/**

* checkPoint的简单测试

*/

object checkPoint_test {

def main(args: Array[String]) {

val conf= new SparkConf().setAppName("checkPoint").setMaster("local")

val sc =new SparkContext(conf)

//设置checkPoint的数据的存储目录,存在HDFS上,这里只是本地测试

sc.setCheckpointDir("d:/checkPoint")

val list=List("java","scala","python")

val listRDD=sc.parallelize(list)

//复杂的业务计算,长的RDD的依赖关系,使用checkPoint,使用优化设置cache()

listRDD.checkpoint()

listRDD.collect();

sc.stop()

}

}

代码地址:链接:http://pan.baidu.com/s/1nuTS8WT
密码:9bf7

注意:checkPoint是懒执行。

详细的关于Spark checkPoint的源码级别的解析,请参考博客:

Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解:

http://www.cnblogs.com/jcchoiling/p/6513569.html

Spark Checkpoint读操作代码分析:https://www.iteblog.com/archives/1555.html

Spark Checkpoint写操作代码分析:https://www.iteblog.com/archives/1535.html

spark源码分析之Checkpoint的过程:

https://yq.aliyun.com/articles/74946?utm_campaign=wenzhang&utm_medium=article&utm_source=QQ-qun&201752&utm_content=m_19140

思维导图构建你的知识体系:



参考:

http://www.cnblogs.com/ylcoder/p/5730975.html

http://blog.csdn.net/hutao_hadoop/article/details/52694025

http://www.ccblog.cn/102.htm

http://www.bijishequ.com/detail/376815?p=
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: