您的位置:首页 > 编程语言

Spark Checkpoint写操作代码分析

2016-07-17 16:06 423 查看
  《Spark RDD缓存代码分析》

  《Spark Task序列化代码分析》

  《Spark分区器HashPartitioner和RangePartitioner代码详解》

  《Spark Checkpoint读操作代码分析》

  《Spark Checkpoint写操作代码分析》

  上次我对Spark RDD缓存的相关代码《Spark
RDD缓存代码分析》进行了简要的介绍,本文将对Spark RDD的checkpint相关的代码进行相关的介绍。先来看看怎么使用checkpont:

  先是初始化好相关的RDD,因为checkpoint是将RDD中的数据写到磁盘,所以需要指定一个checkpint目录,也就是
sc.setCheckpointDir("/www/iteblog/com")
,这步执行完之后会在
/www/iteblog/com
路径下创建相关的文件夹,比如:
/www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc
;然后对data
RDD进行checkpoint,整个代码运行完,会在
/www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc
生存相关的文件:
Found 4 items
-rw-r--r-- 3 iteblog iteblog 0 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00000
-rw-r--r-- 3 iteblog iteblog 5 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00001
-rw-r--r-- 3 iteblog iteblog 9 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00002
-rw-r--r-- 3 iteblog iteblog 5 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00003
现在来对checkpint的相关代码进行简单介绍。首先就是设置checkpint的目录,这个代码如下:
从上面注释可以看出,如果是非local模式,directory要求是HDFS上的目录。事实上,如果你是非local模式,但是指定的checkpint路径是本地路径,程序运行的时候会出现类似以下的异常:
setCheckpointDir的过程主要是在指定的目录下创建一个文件夹,这个文件夹会在后面用到。然后我们对RDD进行checkpoint,主要做的事情如下:
  程序第一步就是判断checkpointDir是否为空,如果为空直接抛出异常,而这个checkpointDir是由上面的[code]setCheckpointDir
函数设置的。这里我们应该设置了checkpointDir,所以直接判断
checkpointData.isEmpty
是否成立,
checkpointData
是什么东西呢?它的类型如下:
  RDDCheckpointData类是和RDD一一对应的,保存着一切和RDD checkpint相关的所有信息,而且具体的Checkpint操作都是它(子类)进行的。而对RDD调用checkpoint函数主要就是初始化
ReliableRDDCheckpointData
对象,供以后进行checkpint操作。从这段代码我们知道,对RDD调用checkpoint函数,其实就是初始化了
checkpointData
,并不立即执行checkpint操作,你可以理解成这里只是对RDD进行checkpint标记操作。  那什么触发真正的checkpoint操作?仔细看上面例子,执行
data.count
之后才会生成checkpoint文件。是的,只有在Action触发Job的时候才会进行checkpint。Spark在执行完Job之后会判断是否需要checkpint:
注意看最后一句代码
rdd.doCheckpoint()
,这个就是触发RDD的checkpoint的,而doCheckpoint函数的实现如下:
又看到checkpointData了吧?这个就是在执行checkpint()函数定义的,所以如果你的RDD调用了checkpint()函数,那么
checkpointData.isDefined
肯定是true的。而如果你的父RDD调用了checkpint()函数,最后也会执行你父RDD的
checkpointData.get.checkpoint()
代码。我们来看看checkpointData中的checkpoint()是如何实现的,代码如下:
为了防止多个线程对同一个RDD进行checkpint操作,首先是把checkpint的状态由Initialized变成CheckpointingInProgress,所以如果另一个线程发现checkpint的状态不是Initialized就直接return了。最后就是doCheckpoint实现了:
首先是创建写RDD的目录,然后启动一个Job去写Checkpint文件,主要由
ReliableCheckpointRDD.writeCheckpointFile
来实现写操作。
写完Checkpint文件之后,会返回newRDD,并最后赋值给cpRDD,并将Checkpint的状态变成Checkpointed。最后将这个RDD的依赖全部清除(
markCheckpointed()
整个写操作就完成了。
转载自过往记忆(http://www.iteblog.com/)

[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: