您的位置:首页 > 其它

Spark的存储管理

2016-01-05 23:48 246 查看
功能上看Spark的存储管理模型可以分为两部分:RDD缓存和Shuffle数据的持久化.
RDD缓存,指的是RDD调用cache(),persist()或checkpoint,调用这个三个方法会将RDD对应的数据块结果存储到内存或者磁盘中,可以将宽依赖的结果存储下来.
Shuffle数据持久化,在一个Application中,可能会经过多次Shuffle过程,Shuffle的中间数据块是会被保存下来的直到Application结束.
Shuffle数据持久化代码说明:

def
testShuffleAndCache
():
Unit
={

val
conf =
new
SparkConf().setAppName(
"run-algorithm").setMaster("local"
)

val
sc =
new
SparkContext(conf)

val
num =
100

val
a1 =
new
Array[
Double](num)

val
a2 =
new
Array[
Double](num)

val
a3 =
new
Array[
Double](num)

val
r =
new
Random()

for(i <- a1.indices){

a1(i) = r.nextDouble()

a2(i) = r.nextDouble()

a3(i) = r.nextDouble()

}

val
srcRDD = sc.parallelize(a1 zip a2)

/**Shuffle
持久化不能存储
map的结果,因为
map是宽依赖,但是如果在map后面加
cache/persist就能将map结果存储下来
*/

val
sortedRDD = srcRDD.sortBy(x=>x._1).sortBy(x=>x._2).sortBy(x=>x._1).sortBy(x=>x._2).map(_._1)

/**以下sortedRDD第一次被执行
,查看前段的日志输出的时候
,会发现输出结果之前执行了
5个stages*/

println
(sortedRDD.count())

/**以下sortedRDD不是第一次执行了
,sortedRDD已经保存了最后一次
Shuffle的数据,所以只执行了一个
stage*/

println
(sortedRDD.count())

}

在这里有必要说一下Spark的cache,cache只适合使用在不太大的RDD中,如果RDD太大不要使用cache,使用persist或者checkpoint,因为cache是将整个RDD存在内存中,RDD太大的话显然是放不下的。这里举个例子:
假设有100G的数据要处理,每个块128m,在分布式中,是每个core每次只将一个块读入内存中的,每次处理完一个再读取下一个入内存,上一个的处理结果存入磁盘,所以才能处理无穷大的数据量,但是cache是将所有数据存入内存,所以10G的机子肯定不可能cache 100G的数据。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: