Spark算子执行流程详解之一
2017-03-02 09:55
369 查看
1.take
获取前num条记录。def take(num: Int): Array[T] = withScope { if (num == 0) { new Array[T](0) } else { val buf = newArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry =1 if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.size ==0) {//截止目前为止buf为空的话,则扩大4倍范围 numPartsToTry = partsScanned * 4 } else {//截止目前为止还有部分值没取到的话,则扩大至Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1),但是不超过当前已扫描过分区的4倍 // the left side of max is >=1 whenever partsScanned >= 2 numPartsToTry = Math.max((1.5* num * partsScanned / buf.size).toInt - partsScanned, 1) numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) } } val left = num - buf.size val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal =true) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += numPartsToTry } buf.toArray } } |
/** * Run a job on a given set of partitions of an RDD, but take a function of type * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. */ def runJob[T,U: ClassTag]( rdd: RDD[T], func: Iterator[T] =>U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal) } |
其次先看第一次循环,其partsScanned为0,numPartsToTry为1,因此先计算第一个分区的结果,如果第一次计算可以取得满足条件的num个值,则循环结束,如果取不到满足条件的num个值,则扩大第二次计算的分区范围,很可能一下子扫多个分区。
其执行过程见下图:
Take可以避免全量计算,执行时间比较短。但可能会多次触发action。
2.first
取RDD的第一个元素/** * Return the first element in this RDD. */ def first(): T = withScope { take(1) match { case Array(t) => t case _ => throw newUnsupportedOperationException("empty collection") } } |
3.sortByKey
def sortByKey(ascending: Boolean =true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = newRangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K,V, V](self, part) .setKeyOrdering(if (ascending)ordering elseordering.reverse) } |
父RDD的每个分区按照分区函数RangePartitioner将每个分区的数据划分为多个分区的数据,然后ShuffledRDD拉取自己对应分区的数据。但是sortByKey主要应该掌握其RangePartitioner分区函数的执行原理,它如何保证ShuffledRDD的每个分区的数量是大致相同的,也就是如何来划分每个分区的边界的,且看:
class RangePartitioner[K: Ordering : ClassTag, V]( @transient partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], private var ascending: Boolean =true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found$partitions.") private var ordering= implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions前(partitions - 1)的分区边界 private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0* partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt // numItems相当于记录rdd元素的总数 // sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据 val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx,n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key<- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id- 1) val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect() val weight = (1.0/ fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } RangePartitioner.determineBounds(candidates, partitions) } } } def numPartitions: Int = rangeBounds.length +1 private var binarySearch: ((Array[K],K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <=128) { // If we have less than 128 partitions naive search while (partition <rangeBounds.length && ordering.gt(k,rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition <0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } private[spark] objectRangePartitioner { /** * Sketches the input RDD via reservoir sampling on each partition. * * @param rdd the input RDD to sketch * @param sampleSizePerPartition max sample size per partition * @return (total number of items, an array of (partitionId, number of items, sample)) */ def sketch[K: ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift <<16)) //Reservoir:水塘抽样 val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) } /** * Determines the bounds for range partitioning from candidates with weights indicating how many * items each represents. Usually this is 1 over the probability used to sample this candidate. * * @param candidates unordered candidates with weights * @param partitions number of partitions * @return selected bounds */ def determineBounds[K: Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions -1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight > target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray } } |
首先抛出个数学算法,即 Reservoir Sampling(水塘抽样),目的在于从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。
具体推导过程这里不详细描述,直接写结论:
在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数。
//stream代表数据流 //reservoir代表返回长度为k的池塘 //从stream中取前k个放入reservoir; for ( int i = 1; i < k; i++) reservoir[i] = stream[i]; for (i = k; stream != null; i++) { p = random(0, i); if (p < k) reservoir[p] = stream[i]; return reservoir; |
private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0* partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt // numItems相当于记录rdd元素的总数 // sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据 // sampleSizePerPartition代表的是每个分区抽样的值,然后针对待排序的key值进行抽样,即sketch函数 val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. //如果存在数据倾斜的情况,则某些分区包含数据量多的情况下,抽样的值偏少,需要增加抽样的数目 val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx,n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key<- sample) { candidates += ((key, weight)) } } } //针对不平衡的分区继续抽样 if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id- 1) val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect() val weight = (1.0/ fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } //根据各个分区抽样的值来划分边界,其中weight值反应某个key的权重,权重越大,说明该key值越多 RangePartitioner.determineBounds(candidates, partitions) } } } |
def sketch[K: ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift <<16)) //Reservoir:水塘抽样 val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) } def reservoirSampleAndCount[T: ClassTag]( input: Iterator[T], k: Int, seed: Long = Random.nextLong()) : (Array[T], Int) = { val reservoir = newArray[T](k) // Put the first k elements in the reservoir. //先取前K个值 vari = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 } // If we have consumed all the elements, return them. Otherwise do the replacement. if (i < k) {//如果没有取到,说明该分区少于K个值,则直接返回 // If input size < k, trim the array to return only an array of input size. val trimReservoir =new Array[T](i) System.arraycopy(reservoir, 0, trimReservoir,0, i) (trimReservoir, i) } else {//否则按照水塘抽样遍历剩余的值 // If input size > k, continue the sampling process. val rand = new XORShiftRandom(seed) while (input.hasNext) { val item = input.next() val replacementIndex = rand.nextInt(i) if (replacementIndex < k) { reservoir(replacementIndex) = item } i += 1 } (reservoir, i) } } |
def determineBounds[K: Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions -1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight > target) {//根据weight值来均衡划分分界 // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray } |
因此执行RDD的sortbykey操作,会导致对其key的至少一次扫描,比较耗时间,对外表现就是会执行一次action操作。
相关文章推荐
- Spark算子执行流程详解之三
- Spark算子执行流程详解之五
- Spark算子执行流程详解之二
- Spark算子执行流程详解之四
- Spark算子执行流程详解之七
- Spark算子执行流程详解之六
- Spark算子执行流程详解之八
- MapReduce执行流程详解
- 转 alsa录音放音执行流程详解
- 详解Magento执行流程
- 【Spark】RDD操作详解4——Action算子
- Android编译系统详解(二)——命令执行流程
- Spark SQL深度理解篇:模块实现、代码结构及执行流程总览
- .net/c#中栈和堆的区别及代码在栈和堆中的执行流程详解
- Spark学习之15:Spark Streaming执行流程(1)
- Spark学习之16:Spark Streaming执行流程(2)
- Spark Streaming 执行流程
- 转 alsa录音放音执行流程详解
- Java try catch finally 虚拟机执行流程详解
- .net/c#中栈和堆的区别及代码在栈和堆中的执行流程详解之一