Spark Checkpoint写操作代码分析
2016-07-17 16:06
423 查看
《Spark RDD缓存代码分析》
《Spark Task序列化代码分析》
《Spark分区器HashPartitioner和RangePartitioner代码详解》
《Spark Checkpoint读操作代码分析》
《Spark Checkpoint写操作代码分析》
上次我对Spark RDD缓存的相关代码《Spark
RDD缓存代码分析》进行了简要的介绍,本文将对Spark RDD的checkpint相关的代码进行相关的介绍。先来看看怎么使用checkpont:
先是初始化好相关的RDD,因为checkpoint是将RDD中的数据写到磁盘,所以需要指定一个checkpint目录,也就是
RDD进行checkpoint,整个代码运行完,会在
从上面注释可以看出,如果是非local模式,directory要求是HDFS上的目录。事实上,如果你是非local模式,但是指定的checkpint路径是本地路径,程序运行的时候会出现类似以下的异常:
setCheckpointDir的过程主要是在指定的目录下创建一个文件夹,这个文件夹会在后面用到。然后我们对RDD进行checkpoint,主要做的事情如下:
程序第一步就是判断checkpointDir是否为空,如果为空直接抛出异常,而这个checkpointDir是由上面的[code]setCheckpointDir函数设置的。这里我们应该设置了checkpointDir,所以直接判断
RDDCheckpointData类是和RDD一一对应的,保存着一切和RDD checkpint相关的所有信息,而且具体的Checkpint操作都是它(子类)进行的。而对RDD调用checkpoint函数主要就是初始化
注意看最后一句代码
又看到checkpointData了吧?这个就是在执行checkpint()函数定义的,所以如果你的RDD调用了checkpint()函数,那么
为了防止多个线程对同一个RDD进行checkpint操作,首先是把checkpint的状态由Initialized变成CheckpointingInProgress,所以如果另一个线程发现checkpint的状态不是Initialized就直接return了。最后就是doCheckpoint实现了:
首先是创建写RDD的目录,然后启动一个Job去写Checkpint文件,主要由
写完Checkpint文件之后,会返回newRDD,并最后赋值给cpRDD,并将Checkpint的状态变成Checkpointed。最后将这个RDD的依赖全部清除(
整个写操作就完成了。
转载自过往记忆(http://www.iteblog.com/)
[/code]
《Spark Task序列化代码分析》
《Spark分区器HashPartitioner和RangePartitioner代码详解》
《Spark Checkpoint读操作代码分析》
《Spark Checkpoint写操作代码分析》
上次我对Spark RDD缓存的相关代码《Spark
RDD缓存代码分析》进行了简要的介绍,本文将对Spark RDD的checkpint相关的代码进行相关的介绍。先来看看怎么使用checkpont:
sc.setCheckpointDir("/www/iteblog/com"),这步执行完之后会在
/www/iteblog/com路径下创建相关的文件夹,比如:
/www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc;然后对data
RDD进行checkpoint,整个代码运行完,会在
/www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc生存相关的文件:
Found 4 items -rw-r--r-- 3 iteblog iteblog 0 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00000 -rw-r--r-- 3 iteblog iteblog 5 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00001 -rw-r--r-- 3 iteblog iteblog 9 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00002 -rw-r--r-- 3 iteblog iteblog 5 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00003 现在来对checkpint的相关代码进行简单介绍。首先就是设置checkpint的目录,这个代码如下:
checkpointData.isEmpty是否成立,
checkpointData是什么东西呢?它的类型如下:
ReliableRDDCheckpointData对象,供以后进行checkpint操作。从这段代码我们知道,对RDD调用checkpoint函数,其实就是初始化了
checkpointData,并不立即执行checkpint操作,你可以理解成这里只是对RDD进行checkpint标记操作。 那什么触发真正的checkpoint操作?仔细看上面例子,执行
data.count之后才会生成checkpoint文件。是的,只有在Action触发Job的时候才会进行checkpint。Spark在执行完Job之后会判断是否需要checkpint:
rdd.doCheckpoint(),这个就是触发RDD的checkpoint的,而doCheckpoint函数的实现如下:
checkpointData.isDefined肯定是true的。而如果你的父RDD调用了checkpint()函数,最后也会执行你父RDD的
checkpointData.get.checkpoint()代码。我们来看看checkpointData中的checkpoint()是如何实现的,代码如下:
ReliableCheckpointRDD.writeCheckpointFile来实现写操作。
markCheckpointed())
转载自过往记忆(http://www.iteblog.com/)
[/code]
相关文章推荐
- 数字在排序数组中出现的次数 java
- Java基础学习第一天
- VB程序学习代码记录20160717
- java-基础-变长参数
- strcpy和strncpy用法和区别
- 源码分析-java-AbstractList-Itr和ListItr的实现
- C语言字符串操作总结大全
- pom.xml简介
- C语言 程序 打印日历
- python学习——分布式进程
- C#已解决问题集锦
- java判断文件的真实类型
- C++中虚函数表存储位置浅析
- Spark Checkpoint读操作代码分析
- JAVA 面向对象 类和对象
- matlab绘制 三维剖面图
- java并发之CAS
- php生成唯一订单号的方法
- Java的基础概念
- Java?C++?虚函数?抽象?