RDD 中foreach与foreachPartition区别
2017-12-14 13:44
176 查看
参考:
http://blog.csdn.net/u013939918/article/details/60881711
https://www.cnblogs.com/mfmdaoyou/p/7073861.html
RDD.foreachPartition/foreach的操作
在这个action的操作中:
这两个action主要用于对每一个partition中的iterator时行迭代的处理.通过用户传入的function对iterator进行内容的处理.
首先我们先看看foreach的操作:
在fureach中,传入一个function,这个函数的传入參数就是每一个partition中,每次的foreach得到的一个rdd的kv实例,也就是详细的内容,这样的处理你并不知道这个iterator的foreach什么时候结果,仅仅能是foreach的过程中,你得到一条数据,就处理一条数据.
由以下的红色部分能够看出,foreach操作是直接调用了partition中数据的foreach操作.
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
演示样例说明:
val list = new ArrayBuffer()
Rdd.foreach(record => {
list += record
If (list.size >= 10000) {
list.flush....
}
})
上面这段演示样例代码中,假设这么使用就会存在一个问题,
迭代的最后,list的结果可能还没有达到10000条,这个时候,你在内部的处理的flush部分就不会运行,也就是迭代的最后假设没有达到10000的数据就会丢失.
所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})
然后接下来看看foreachPartition:
这个函数也是依据传入的function进行处理,但不同处在于,这里function的传入參数是一个partition相应数据的iterator.而不是直接使用iterator的foreach,
这样的情况下,假设是上面foreach的演示样例代码中list这个片段在这个action中就行正常的去处理.
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
演示样例代码:
Val list = new ArrayBuffer
rdd.foreachPartition(it => {
It.foreach(r => {
List += r
If (list.size > 10000) flush
})
If (list.size > 0) flush
})
最后说下这两个action的差别:
Foreach与foreachPartition都是在每一个partition中对iterator进行操作,
不同的是,foreach是直接在每一个partition中直接对iterator运行foreach操作,而传入的function仅仅是在foreach内部使用,
而foreachPartition是在每一个partition中把iterator给传入的function,让function自己对iterator进行处理.
个人理解就是foreach和foreachPartition的参数:函数(T)的作用对象不一样,
foreach是作用在iterator循环取出来的每个RDD上,foreachPartition是让
该iterator
执行这个函数(T)
http://blog.csdn.net/u013939918/article/details/60881711
https://www.cnblogs.com/mfmdaoyou/p/7073861.html
RDD.foreachPartition/foreach的操作
在这个action的操作中:
这两个action主要用于对每一个partition中的iterator时行迭代的处理.通过用户传入的function对iterator进行内容的处理.
首先我们先看看foreach的操作:
在fureach中,传入一个function,这个函数的传入參数就是每一个partition中,每次的foreach得到的一个rdd的kv实例,也就是详细的内容,这样的处理你并不知道这个iterator的foreach什么时候结果,仅仅能是foreach的过程中,你得到一条数据,就处理一条数据.
由以下的红色部分能够看出,foreach操作是直接调用了partition中数据的foreach操作.
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
演示样例说明:
val list = new ArrayBuffer()
Rdd.foreach(record => {
list += record
If (list.size >= 10000) {
list.flush....
}
})
上面这段演示样例代码中,假设这么使用就会存在一个问题,
迭代的最后,list的结果可能还没有达到10000条,这个时候,你在内部的处理的flush部分就不会运行,也就是迭代的最后假设没有达到10000的数据就会丢失.
所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})
然后接下来看看foreachPartition:
这个函数也是依据传入的function进行处理,但不同处在于,这里function的传入參数是一个partition相应数据的iterator.而不是直接使用iterator的foreach,
这样的情况下,假设是上面foreach的演示样例代码中list这个片段在这个action中就行正常的去处理.
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
演示样例代码:
Val list = new ArrayBuffer
rdd.foreachPartition(it => {
It.foreach(r => {
List += r
If (list.size > 10000) flush
})
If (list.size > 0) flush
})
最后说下这两个action的差别:
Foreach与foreachPartition都是在每一个partition中对iterator进行操作,
不同的是,foreach是直接在每一个partition中直接对iterator运行foreach操作,而传入的function仅仅是在foreach内部使用,
而foreachPartition是在每一个partition中把iterator给传入的function,让function自己对iterator进行处理.
个人理解就是foreach和foreachPartition的参数:函数(T)的作用对象不一样,
foreach是作用在iterator循环取出来的每个RDD上,foreachPartition是让
该iterator
执行这个函数(T)
相关文章推荐
- RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- 【Spark】SparkStreaming-foreachrdd foreachpartition
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Rdd的 foreach 和 foreachPartition
- Spark编程之基本的RDD算子sparkContext,foreach,foreachPartition, collectAsMap
- Spark Streaming之妙用foreachRDD和foreachPartition
- Spark算子:RDD行动Action操 4000 作(4)–countByKey、foreach、foreachPartition、sortBy
- 3.4 Spark RDD Action操作4-countByKey、foreach、foreachPartition、sortBy
- Scala的foreachRDD
- for和foreach的区别
- javascript中map、foreach、reduce、filter间区别?
- MySQL数据库与其他数据库的3个常用语法区别(外链接、分组排序row_number() over(partition by ) 、group by和distinct)
- for 与 foreach区别
- 数据库分片(Sharding)与分区(Partition)的区别
- ThinkPHP中foreach和volist的区别
- spark源代码action系列-foreach与foreachPartition
- Spark中Task,Partition,RDD、节点数、Executor数、core数目的关系
- foreach和for的区别