Spark的存储管理
2016-01-05 23:48
246 查看
功能上看Spark的存储管理模型可以分为两部分:RDD缓存和Shuffle数据的持久化.
RDD缓存,指的是RDD调用cache(),persist()或checkpoint,调用这个三个方法会将RDD对应的数据块结果存储到内存或者磁盘中,可以将宽依赖的结果存储下来.
Shuffle数据持久化,在一个Application中,可能会经过多次Shuffle过程,Shuffle的中间数据块是会被保存下来的直到Application结束.
Shuffle数据持久化代码说明:
def
testShuffleAndCache
():
Unit
={
val
conf =
new
SparkConf().setAppName(
"run-algorithm").setMaster("local"
)
val
sc =
new
SparkContext(conf)
val
num =
100
val
a1 =
new
Array[
Double](num)
val
a2 =
new
Array[
Double](num)
val
a3 =
new
Array[
Double](num)
val
r =
new
Random()
for(i <- a1.indices){
a1(i) = r.nextDouble()
a2(i) = r.nextDouble()
a3(i) = r.nextDouble()
}
val
srcRDD = sc.parallelize(a1 zip a2)
/**Shuffle
持久化不能存储
map的结果,因为
map是宽依赖,但是如果在map后面加
cache/persist就能将map结果存储下来
*/
val
sortedRDD = srcRDD.sortBy(x=>x._1).sortBy(x=>x._2).sortBy(x=>x._1).sortBy(x=>x._2).map(_._1)
/**以下sortedRDD第一次被执行
,查看前段的日志输出的时候
,会发现输出结果之前执行了
5个stages*/
println
(sortedRDD.count())
/**以下sortedRDD不是第一次执行了
,sortedRDD已经保存了最后一次
Shuffle的数据,所以只执行了一个
stage*/
println
(sortedRDD.count())
}
在这里有必要说一下Spark的cache,cache只适合使用在不太大的RDD中,如果RDD太大不要使用cache,使用persist或者checkpoint,因为cache是将整个RDD存在内存中,RDD太大的话显然是放不下的。这里举个例子:
假设有100G的数据要处理,每个块128m,在分布式中,是每个core每次只将一个块读入内存中的,每次处理完一个再读取下一个入内存,上一个的处理结果存入磁盘,所以才能处理无穷大的数据量,但是cache是将所有数据存入内存,所以10G的机子肯定不可能cache 100G的数据。
RDD缓存,指的是RDD调用cache(),persist()或checkpoint,调用这个三个方法会将RDD对应的数据块结果存储到内存或者磁盘中,可以将宽依赖的结果存储下来.
Shuffle数据持久化,在一个Application中,可能会经过多次Shuffle过程,Shuffle的中间数据块是会被保存下来的直到Application结束.
Shuffle数据持久化代码说明:
def
testShuffleAndCache
():
Unit
={
val
conf =
new
SparkConf().setAppName(
"run-algorithm").setMaster("local"
)
val
sc =
new
SparkContext(conf)
val
num =
100
val
a1 =
new
Array[
Double](num)
val
a2 =
new
Array[
Double](num)
val
a3 =
new
Array[
Double](num)
val
r =
new
Random()
for(i <- a1.indices){
a1(i) = r.nextDouble()
a2(i) = r.nextDouble()
a3(i) = r.nextDouble()
}
val
srcRDD = sc.parallelize(a1 zip a2)
/**Shuffle
持久化不能存储
map的结果,因为
map是宽依赖,但是如果在map后面加
cache/persist就能将map结果存储下来
*/
val
sortedRDD = srcRDD.sortBy(x=>x._1).sortBy(x=>x._2).sortBy(x=>x._1).sortBy(x=>x._2).map(_._1)
/**以下sortedRDD第一次被执行
,查看前段的日志输出的时候
,会发现输出结果之前执行了
5个stages*/
println
(sortedRDD.count())
/**以下sortedRDD不是第一次执行了
,sortedRDD已经保存了最后一次
Shuffle的数据,所以只执行了一个
stage*/
println
(sortedRDD.count())
}
在这里有必要说一下Spark的cache,cache只适合使用在不太大的RDD中,如果RDD太大不要使用cache,使用persist或者checkpoint,因为cache是将整个RDD存在内存中,RDD太大的话显然是放不下的。这里举个例子:
假设有100G的数据要处理,每个块128m,在分布式中,是每个core每次只将一个块读入内存中的,每次处理完一个再读取下一个入内存,上一个的处理结果存入磁盘,所以才能处理无穷大的数据量,但是cache是将所有数据存入内存,所以10G的机子肯定不可能cache 100G的数据。
相关文章推荐
- Maven私服初步搭建
- 实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO
- sql server之数据库语句优化
- 能使 Oracle 索引失效的七大限制条件
- spark搭建与编译
- Java的反射机制
- leetcode 118: Pascal's Triangle
- 打造先进的内存KV数据库-5 TCP侦听
- Top resources to learn Android
- 基于事件的 JavaScript 编程:异步与同
- Android中程序与Service交互(简单)
- c#中使用状态栏
- cdpsnarf基于Kali Linux环境的使用
- Spark编程模型
- Spark操作Hbase
- 谷歌浏览器提示Adobe Flash Player因过期而遭到阻止
- Python/C API 3.4 简介
- 【C语言提高36】结构体类型和变量定义及基本操作
- spark-submit工具参数说明
- [转] GitHub上README.md教程