Spark学习——缓存、闭包及共享变量
2017-04-17 16:25
363 查看
一、缓存
Spark中也有缓存机制,或者说持久化机制。因为RDD的转化都是惰性的,这就意味着在调用action操作之前Spark是不会计算
的,Spark会在内部记录所要求的执行步骤的全部流程,构建一个有向无环图(DAG)。同样在把数据读入到RDD的操作也是惰性
的。由于这个特性,有时候需要能够多次使用同一个RDD时,如果简单地对RDD调用action操作,Spark每次都会重算RDD和它的所有
依赖,这样子消耗很大。利用缓存可以让action计算速度加快(通常会加速10倍)
所以Spark有持久化的操作,当我们让Spark持久化存储一个RDD时,计算出的RDD节点会分别保存它们所求出的RDD分区数
据。如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存数据时重算丢失的数据分区。我们可以把我们的数据备份到多个节点避免这种情况发生。
RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。cache()方法本质调用的是MEMORY_ONLY级别的persist()方法,而persist()方法可以认为选择存储级别:
MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
DISK_ONLY : 只在磁盘上缓存 RDD。
MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。
那么应该如何选择存储级别呢?核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :
如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
如果内存不能全部存储 RDD,那么使用MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个
RDD,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist() 方法。
二、闭包和共享变量
看下面一个情景:
上面的代码行为是不确定的,并且可能无法按预期正常工作。Spark 执行作业时,会分解 RDD 操作到每个执行者里。在执行之前,Spark 计算任务的 closure(闭包)。而闭包是在 RDD 上的执行者必须能够访问的变量和方法(在此情况下的 foreach() )。闭包被序列化并被发送到每个执行器。闭包的变量副本发给每个
executor ,当 counter 被 foreach 函数引用的时候,它已经不再是 driver node 的 counter 了。虽然在 driver node 仍然有一个 counter 在内存中,但是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。所以 counter 最终的值还是 0,因为对 counter 所有的操作所有操作均引用序列化的 closure 内的值。
因此在这种情况下,应该使用共享变量!
1、广播变量
Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以降低通信成本。
Spark 的 action(动作)操作是通过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是通过分布式的 "shuffle" 操作进行拆分的。Spark 会自动广播出每个 stage(阶段)内任务所需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。
2、累加器
Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。
累加器的更新只发生在 action 操作中,Spark 保证每个任务只更新累加器一次,例如,重启任务不会更新值。在 transformations(转换)中, 用户需要注意的是,如果 task(任务)或 job stages(阶段)重新执行,每个任务的更新操作可能会执行多次。
累加器不会改变 Spark lazy evaluation(懒加载)的模式。如果累加器在 RDD 中的一个操作中进行更新,它们的值仅被更新一次,RDD 被作为 action 的一部分来计算。因此,在一个像 map() 这样的 transformation(转换)时,累加器的更新并没有执行。下面的代码片段证明了这个特性 :
Spark中也有缓存机制,或者说持久化机制。因为RDD的转化都是惰性的,这就意味着在调用action操作之前Spark是不会计算
的,Spark会在内部记录所要求的执行步骤的全部流程,构建一个有向无环图(DAG)。同样在把数据读入到RDD的操作也是惰性
的。由于这个特性,有时候需要能够多次使用同一个RDD时,如果简单地对RDD调用action操作,Spark每次都会重算RDD和它的所有
依赖,这样子消耗很大。利用缓存可以让action计算速度加快(通常会加速10倍)
所以Spark有持久化的操作,当我们让Spark持久化存储一个RDD时,计算出的RDD节点会分别保存它们所求出的RDD分区数
据。如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存数据时重算丢失的数据分区。我们可以把我们的数据备份到多个节点避免这种情况发生。
RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。cache()方法本质调用的是MEMORY_ONLY级别的persist()方法,而persist()方法可以认为选择存储级别:
MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
DISK_ONLY : 只在磁盘上缓存 RDD。
MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。
那么应该如何选择存储级别呢?核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :
如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
如果内存不能全部存储 RDD,那么使用MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个
RDD,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist() 方法。
二、闭包和共享变量
看下面一个情景:
List<Integer> nums = Arrays.asList(1,3,2,4,3,5,4,6,5,4); final int[] count = {0}; JavaRDD<Integer> rawraw = sc.parallelize(nums); rawraw.foreach(a -> count[0] = count[0] +a);
上面的代码行为是不确定的,并且可能无法按预期正常工作。Spark 执行作业时,会分解 RDD 操作到每个执行者里。在执行之前,Spark 计算任务的 closure(闭包)。而闭包是在 RDD 上的执行者必须能够访问的变量和方法(在此情况下的 foreach() )。闭包被序列化并被发送到每个执行器。闭包的变量副本发给每个
executor ,当 counter 被 foreach 函数引用的时候,它已经不再是 driver node 的 counter 了。虽然在 driver node 仍然有一个 counter 在内存中,但是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。所以 counter 最终的值还是 0,因为对 counter 所有的操作所有操作均引用序列化的 closure 内的值。
因此在这种情况下,应该使用共享变量!
1、广播变量
Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以降低通信成本。
Spark 的 action(动作)操作是通过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是通过分布式的 "shuffle" 操作进行拆分的。Spark 会自动广播出每个 stage(阶段)内任务所需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3]
2、累加器
Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。
LongAccumulator accum = jsc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10
累加器的更新只发生在 action 操作中,Spark 保证每个任务只更新累加器一次,例如,重启任务不会更新值。在 transformations(转换)中, 用户需要注意的是,如果 task(任务)或 job stages(阶段)重新执行,每个任务的更新操作可能会执行多次。
累加器不会改变 Spark lazy evaluation(懒加载)的模式。如果累加器在 RDD 中的一个操作中进行更新,它们的值仅被更新一次,RDD 被作为 action 的一部分来计算。因此,在一个像 map() 这样的 transformation(转换)时,累加器的更新并没有执行。下面的代码片段证明了这个特性 :
相关文章推荐
- spark学习-25-Spark广播变量与共享变量(1)
- 【Spark】Sparkstreaming-共享变量-缓存RDD-到底是什么情况?
- spark 学习(二) RDD及共享变量
- spark 学习(二) RDD及共享变量
- spark学习系列——9 spark共享变量
- spark源码学习(九):map端计算结果缓存处理(一)
- IOS学习 多线程NSThread 共享变量 卖票
- 从缓存行出发理解volatile变量、伪共享False sharing、disruptor
- 第43讲:Scala中类型变量Bounds代码实战及其在Spark中的应用源码解析学习笔记
- spark学习七 共享内存的实现(快速的共享数据)
- 【慕课网学习笔记】Java共享变量的可见性和原子性
- 【java学习笔记】java中的中间缓存变量机制
- 张孝祥ThreadLocal实现线程范围内共享变量(学习笔记)
- Thread学习(八) ThreadLocal实现线程范围内的共享变量
- spark共享变量
- 从缓存行出发理解volatile变量、伪共享False sharing、disruptor
- Spark-神奇的共享变量
- spark共享变量(广播变量Broadcast Variable,累加器Accumulators)
- 关于进程和线程对于全局变量共享的问题学习总结
- SQLite学习笔记(六)&&共享缓存