Spark算子执行流程详解之三
2017-03-02 10:21
447 查看
10.aggregate
用与聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U,def aggregate[U: ClassTag](zeroValue:U)(seqOp: (U,T) => U, combOp: (U,U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) // zeroValue即初始值,aggregatePartition是在excutor上执行的 val aggregatePartition = (it:Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) // jobResult即初始值,其合并每个分区的结果是在driver端执行的 val mergeResult = (index: Int, taskResult:U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult } |
var rdd1 = sc.makeRDD(1 to 10,2) ##第一个分区中包含5,4,3,2,1 ##第二个分区中包含10,9,8,7,6 scala> rdd1.aggregate(1)( | {(x : Int,y : Int) => x + y}, | {(a : Int,b : Int) => a + b} | ) res17: Int = 58 |
11.fold
简化的aggregate,将aggregate中的seqOp和combOp使用同一个函数op。/** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative and commutative function and a neutral "zero value". The function * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * allocation; however, it should not modify t2. * * This behaves somewhat differently from fold operations implemented for non-distributed * collections in functional languages like Scala. This fold operation may be applied to * partitions individually, and then fold those results into the final result, rather than * apply the fold to each element sequentially in some defined ordering. For functions * that are not commutative, the result may differ from that of a fold applied to a * non-distributed collection. */ def fold(zeroValue: T)(op: (T, T) => T): T= withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) //先在excutor上针对分区进行一次fold操作 val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) //然后在driver端合并每个分区上的结果 val mergeResult = (index: Int, taskResult:T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult } |
scala>var rdd1 = sc.makeRDD(1 to 10,2) ##第一个分区中包含5,4,3,2,1 ##第二个分区中包含10,9,8,7,6 scala> rdd1.fold(1)( | (x,y) => x + y | ) res19: Int = 58 ##结果同上面使用aggregate的第一个例子一样,即: scala> rdd1.aggregate(1)( | {(x,y) => x + y}, | {(a,b) => a + b} | ) res20: Int = 58 |
12.treeAggregate
分层进行aggregate,由于aggregate的时候其分区的结算结果是传输到driver端再进行合并的,如果分区比较多,计算结果返回的数据量比较大的话,那么driver端需要缓存大量的中间结果,这样就会加大driver端的计算能力,因此treeAggregate把分区计算结果的合并仍旧放在excutor端进行,将结果在excutor端不断合并缩小返回driver的数据量,最后再driver端进行最后一次合并。/** * Aggregates the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#aggregate]] */ def treeAggregate[U: ClassTag](zeroValue:U)( seqOp: (U, T) =>U, combOp: (U, U) =>U, depth: Int = 2): U = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got$depth.") if (partitions.length == 0) { Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) } else { val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) //针对初始分区的聚合函数 val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) //针对初始的各分区先进行部分聚合 var partiallyAggregated = mapPartitions(it =>Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length //根据传入的depth计算出需要迭代计算的程度 val scale = math.max(math.ceil(math.pow(numPartitions,1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions > scale + numPartitions / scale) {//计算迭代的程度 numPartitions /= scale val curNumPartitions = numPartitions //减少分区个数,合并部分分区的结果 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } //执行最后一次reduce,返回最终结果 partiallyAggregated.reduce(cleanCombOp) } } |
scala> def seq(a:Int,b:Int):Int={ | a+b} seq: (a: Int, b: Int)Int scala> def comb(a:Int,b:Int):Int={ | a+b} comb: (a: Int, b: Int)Int val z =sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18),9) scala> z.treeAggregate(0)(seq,comb,2) res1: Int = 171 |
13.reduce
RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ def reduce(f: (T,T) => T): T = withScope { val cleanF = sc.clean(f) //定义一个遍历partition的函数,这是在excutor端执行的 val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { //reduceLeft从左往后遍历 Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None //定义一个driver端处理分区计算结果的函数,这是在driver端执行的 val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) =>Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty //将结果返回 jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } |
val c = sc.parallelize(1 to 10 , 2) c.reduce((x, y) => x + y)//结果55 |
14.max
返回最大值,其排序方法对象默认的排序方法/** * Returns the max of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */ def max()(implicitord: Ordering[T]):T = withScope { this.reduce(ord.max) } |
scala>var rdd1 = sc.makeRDD(1 to 10,2) ##第一个分区中包含5,4,3,2,1 ##第二个分区中包含10,9,8,7,6 scala> rdd1.max() res19: Int = 10 |
15.min
返回最小值,其排序方法对象默认的排序方法/** * Returns the min of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */ def min()(implicitord: Ordering[T]):T = withScope { this.reduce(ord.min) } |
scala>var rdd1 = sc.makeRDD(1 to 10,2) ##第一个分区中包含5,4,3,2,1 ##第二个分区中包含10,9,8,7,6 scala> rdd1.min() res19: Int = 1 |
16.treeReduce
类似于treeAggregate,利用在excutor端进行多次aggregate来缩小driver的计算开销/** * Reduces the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#reduce]] */ def treeReduce(f: (T,T) => T, depth: Int =2): T = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got$depth.") val cleanF = context.clean(f) //针对初始分区的reduce函数 val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } //针对初始的各分区先进行部分reduce val partiallyReduced = mapPartitions(it =>Iterator(reducePartition(it))) val op: (Option[T], Option[T]) => Option[T] = (c, x) => { if (c.isDefined && x.isDefined) { Some(cleanF(c.get, x.get)) } else if (c.isDefined) { c } else if (x.isDefined) { x } else { None } } //最终调用的还是treeAggregate方法 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException("empty collection")) } |
相关文章推荐
- Spark算子执行流程详解之二
- Spark算子执行流程详解之五
- Spark算子执行流程详解之一
- Spark算子执行流程详解之四
- Spark算子执行流程详解之七
- Spark算子执行流程详解之六
- Spark算子执行流程详解之八
- MapReduce执行流程详解
- 转 alsa录音放音执行流程详解
- 详解Magento执行流程
- 【Spark】RDD操作详解4——Action算子
- Android编译系统详解(二)——命令执行流程
- Spark SQL深度理解篇:模块实现、代码结构及执行流程总览
- .net/c#中栈和堆的区别及代码在栈和堆中的执行流程详解
- Spark学习之15:Spark Streaming执行流程(1)
- Spark学习之16:Spark Streaming执行流程(2)
- Spark Streaming 执行流程
- 转 alsa录音放音执行流程详解
- Java try catch finally 虚拟机执行流程详解
- .net/c#中栈和堆的区别及代码在栈和堆中的执行流程详解之一