您的位置:首页 > 其它

Spark算子执行流程详解之一

2017-03-02 09:55 369 查看

1.take

获取前num条记录。

def take(num: Int): Array[T] = withScope {

  if (num ==
0) {

    new Array[T](0)

  } else {

    val buf = newArrayBuffer[T]

    val totalParts = this.partitions.length

    var partsScanned =
0

    while (buf.size < num && partsScanned < totalParts) {

      // The number of partitions to try in this iteration. It is ok for this number to be

      // greater than totalParts because we actually cap it at totalParts in runJob.

      var numPartsToTry =1

      if (partsScanned >
0) {

        // If we didn't find any rows after the previous iteration, quadruple and retry.

        // Otherwise, interpolate the number of partitions we need to try, but overestimate

        // it by 50%. We also cap the estimation in the end.

        if (buf.size ==0) {//截止目前为止buf为空的话,则扩大4倍范围

          numPartsToTry = partsScanned * 4

        } else {//截止目前为止还有部分值没取到的话,则扩大至Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned,
1),但是不超过当前已扫描过分区的4倍


          // the left side of max is >=1 whenever partsScanned >= 2

          numPartsToTry = Math.max((1.5* num * partsScanned / buf.size).toInt - partsScanned,
1)

          numPartsToTry = Math.min(numPartsToTry, partsScanned *
4)

        }

      }

      val left = num - buf.size

      val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

      val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal
=true)

      res.foreach(buf ++= _.take(num - buf.size))

      partsScanned += numPartsToTry

    }

    buf.toArray

  }

}

首先关注下sc.runJob函数的传参:

/**

 * Run a job on a given set of partitions of an RDD, but take a function of type

 * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.

 */
def runJob[T,U: ClassTag](

    rdd: RDD[T],

    func: Iterator[T] =>U,

    partitions: Seq[Int],

    allowLocal: Boolean

    ): Array[U] = {

  val cleanedFunc = clean(func)

  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)

}

 

其中partitions: Seq[Int]代表需要计算的分区,可以计算某个分区,也可以计算多个分区,是待计算的分区集合。

其次先看第一次循环,其partsScanned为0,numPartsToTry为1,因此先计算第一个分区的结果,如果第一次计算可以取得满足条件的num个值,则循环结束,如果取不到满足条件的num个值,则扩大第二次计算的分区范围,很可能一下子扫多个分区。

其执行过程见下图:



Take可以避免全量计算,执行时间比较短。但可能会多次触发action。

2.first

取RDD的第一个元素

/**

 * Return the first element in this RDD.

 */
def first():
T = withScope {

  take(1) match
{

    case Array(t) => t

    case _ => throw newUnsupportedOperationException("empty collection")

  }

}

其实就是调用take来完成的,take的流程可以查阅take函数详解

3.sortByKey

 

def sortByKey(ascending: Boolean =true, numPartitions: Int = self.partitions.length)

    : RDD[(K, V)] = self.withScope

{

  val part = newRangePartitioner(numPartitions, self, ascending)

  new ShuffledRDD[K,V,
V](self, part)

    .setKeyOrdering(if (ascending)ordering
elseordering.reverse)

}

sortByKey其实就是根据父RDD生成ShuffledRDD的过程,其分区函数为范围分区RangePartitioner,执行过程如下:



父RDD的每个分区按照分区函数RangePartitioner将每个分区的数据划分为多个分区的数据,然后ShuffledRDD拉取自己对应分区的数据。但是sortByKey主要应该掌握其RangePartitioner分区函数的执行原理,它如何保证ShuffledRDD的每个分区的数量是大致相同的,也就是如何来划分每个分区的边界的,且看:

class RangePartitioner[K:
Ordering : ClassTag,
V](

    @transient partitions: Int,

    @transient rdd: RDD[_ <: Product2[K,V]],

    private var ascending: Boolean =true)

  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.

  require(partitions >= 0, s"Number of partitions cannot be negative but found$partitions.")

  private var ordering=
implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions前(partitions - 1)的分区边界

  private var rangeBounds: Array[K] = {

    if (partitions <=
1) {

      Array.empty

    } else {

      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.

      val sampleSize = math.min(20.0* partitions,
1e6)

      // Assume the input partitions are roughly balanced and over-sample a little bit.

      val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt

      // numItems相当于记录rdd元素的总数

      // sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据


      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

      if (numItems ==
0L) {

        Array.empty

      } else {

        // If a partition contains much more than the average number of items, we re-sample from it

        // to ensure that enough items are collected from that partition.

        val fraction = math.min(sampleSize / math.max(numItems,1L),
1.0)

        val candidates = ArrayBuffer.empty[(K, Float)]

        val imbalancedPartitions = mutable.Set.empty[Int]

        sketched.foreach { case (idx,n,
sample) =>

          if (fraction * n > sampleSizePerPartition) {

            imbalancedPartitions += idx

          } else {

            // The weight is 1 over the sampling probability.

            val weight = (n.toDouble / sample.size).toFloat

            for (key<- sample) {

              candidates += ((key, weight))

            }

          }

        }

        if (imbalancedPartitions.nonEmpty) {

          // Re-sample imbalanced partitions with the desired sampling probability.

          val imbalanced =new
PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)

          val seed = byteswap32(-rdd.id-
1)

          val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()

          val weight = (1.0/ fraction).toFloat

          candidates ++= reSampled.map(x => (x, weight))

        }

        RangePartitioner.determineBounds(candidates, partitions)

      }

    }

  }

  def numPartitions: Int = rangeBounds.length +1

  private var binarySearch: ((Array[K],K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {

    val k = key.asInstanceOf[K]

    var partition =
0

    if (rangeBounds.length <=128) {

      // If we have less than 128 partitions naive search

      while (partition <rangeBounds.length &&
ordering.gt(k,rangeBounds(partition))) {

        partition += 1

      }

    } else {

      // Determine which binary search method to use only once.

      partition = binarySearch(rangeBounds, k)

      // binarySearch either returns the match location or -[insertion point]-1

      if (partition <0) {

        partition = -partition-1

      }

      if (partition > rangeBounds.length) {

        partition = rangeBounds.length

      }

    }

    if (ascending) {

      partition

    } else {

      rangeBounds.length - partition

    }

  }
private[spark] objectRangePartitioner {

  /**

   * Sketches the input RDD via reservoir sampling on each partition.

   *

   * @param rdd the input RDD to sketch

   * @param sampleSizePerPartition max sample size per partition

   * @return (total number of items, an array of (partitionId, number of items, sample))

   */

  def sketch[K: ClassTag](

      rdd: RDD[K],

      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

    val shift = rdd.id

    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object

    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>

      val seed = byteswap32(idx ^ (shift <<16))
      //Reservoir:水塘抽样

      val (sample, n) = SamplingUtils.reservoirSampleAndCount(

        iter, sampleSizePerPartition, seed)

      Iterator((idx, n, sample))

    }.collect()

    val numItems = sketched.map(_._2.toLong).sum

    (numItems, sketched)

  }

  /**

   * Determines the bounds for range partitioning from candidates with weights indicating how many

   * items each represents. Usually this is 1 over the probability used to sample this candidate.

   *

   * @param candidates unordered candidates with weights

   * @param partitions number of partitions

   * @return selected bounds

   */

  def determineBounds[K:
Ordering : ClassTag](

      candidates: ArrayBuffer[(K, Float)],

      partitions: Int): Array[K] = {

    val ordering = implicitly[Ordering[K]]

    val ordered = candidates.sortBy(_._1)

    val numCandidates = ordered.size

    val sumWeights = ordered.map(_._2.toDouble).sum

    val step = sumWeights / partitions

    var cumWeight =
0.0

    var target = step

    val bounds = ArrayBuffer.empty[K]

    var i =
0

    var j =
0

    var previousBound = Option.empty[K]

    while ((i < numCandidates) && (j < partitions -1)) {

      val (key, weight) = ordered(i)

      cumWeight += weight

      if (cumWeight > target) {

        // Skip duplicate values.

        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {

          bounds += key

          target += step

          j += 1

          previousBound = Some(key)

        }

      }

      i += 1

    }

    bounds.toArray

  }

}

可见其分区边界由rangeBounds保存,然后提供getPartition函数,根据传入的key获取其对于的分区编号。那么现在的问题就是当不知道父RDD的每个分区总数的情况下,如何保证数据被随机抽样出来,只有数据随机被抽样出来,才能保证之后切分分区的时候每个分区的数目是大致相同的。(这样就可以只扫描一次获取随机值,否则需要先扫描出总数,然后根据总数来抽样,这样就扫描了2次)

       首先抛出个数学算法,即 Reservoir Sampling(水塘抽样),目的在于从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。

       具体推导过程这里不详细描述,直接写结论:

在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数。

//stream代表数据流 

//reservoir代表返回长度为k的池塘 

//从stream中取前k个放入reservoir; 

for ( int i = 1; i < k; i++) 

    reservoir[i] = stream[i]; 

for (i = k; stream != null; i++) { 

    p = random(0, i); 

    if (p < k) reservoir[p] = stream[i]; 

return reservoir; 

接下来看其具体的执行过程:

private var rangeBounds: Array[K] = {

    if (partitions <=
1) {

      Array.empty

    } else {

      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.

      val sampleSize = math.min(20.0* partitions,
1e6)

      // Assume the input partitions are roughly balanced and over-sample a little bit.

      val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt

      // numItems相当于记录rdd元素的总数

      // sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据


//
sampleSizePerPartition代表的是每个分区抽样的值,然后针对待排序的key值进行抽样,即sketch函数


      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

      if (numItems ==
0L) {

        Array.empty

      } else {

        // If a partition contains much more than the average number of items, we re-sample from it

        // to ensure that enough items are collected from that partition.

//如果存在数据倾斜的情况,则某些分区包含数据量多的情况下,抽样的值偏少,需要增加抽样的数目

        val fraction = math.min(sampleSize / math.max(numItems,1L),
1.0)

        val candidates = ArrayBuffer.empty[(K, Float)]

        val imbalancedPartitions = mutable.Set.empty[Int]

        sketched.foreach { case (idx,n,
sample) =>

          if (fraction * n > sampleSizePerPartition) {

            imbalancedPartitions += idx

          } else {

            // The weight is 1 over the sampling probability.

            val weight = (n.toDouble / sample.size).toFloat

            for (key<- sample) {

              candidates += ((key, weight))

            }

          }

        }

//针对不平衡的分区继续抽样

        if
(imbalancedPartitions.nonEmpty) {

          // Re-sample imbalanced partitions with the desired sampling probability.

          val imbalanced =new
PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)

          val seed = byteswap32(-rdd.id-
1)

          val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()

          val weight = (1.0/ fraction).toFloat

          candidates ++= reSampled.map(x => (x, weight))

        }

//根据各个分区抽样的值来划分边界,其中weight值反应某个key的权重,权重越大,说明该key值越多

        RangePartitioner.determineBounds(candidates, partitions)

      }

    }

  }

其中sketch函数:

def sketch[K: ClassTag](

    rdd: RDD[K],

    sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

  val shift = rdd.id

  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object

  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>

    val seed = byteswap32(idx ^ (shift <<16))

    //Reservoir:水塘抽样

    val (sample, n) = SamplingUtils.reservoirSampleAndCount(

      iter, sampleSizePerPartition, seed)

    Iterator((idx, n, sample))

  }.collect()

  val numItems = sketched.map(_._2.toLong).sum

  (numItems, sketched)

}

 

def reservoirSampleAndCount[T: ClassTag](

    input: Iterator[T],

    k: Int,

    seed: Long = Random.nextLong())

  : (Array[T], Int) = {

  val reservoir = newArray[T](k)

  // Put the first k elements in the reservoir.

  //先取前K个值

  vari =
0

  while (i < k && input.hasNext) {

    val item = input.next()

    reservoir(i) = item

    i += 1

  }

  // If we have consumed all the elements, return them. Otherwise do the replacement.

  if (i < k) {//如果没有取到,说明该分区少于K个值,则直接返回

    // If input size < k, trim the array to return only an array of input size.

    val trimReservoir =new
Array[T](i)

    System.arraycopy(reservoir, 0, trimReservoir,0, i)

    (trimReservoir, i)

  } else {//否则按照水塘抽样遍历剩余的值

    // If input size > k, continue the sampling process.

    val rand =
new
XORShiftRandom(seed)

    while (input.hasNext) {

      val item = input.next()

      val replacementIndex = rand.nextInt(i)

      if (replacementIndex < k) {

        reservoir(replacementIndex) = item

      }

      i += 1

    }

    (reservoir, i)

  }

}

最后调用determineBounds来确定分界值:

def determineBounds[K:
Ordering : ClassTag](

    candidates: ArrayBuffer[(K, Float)],

    partitions: Int): Array[K] = {

  val ordering = implicitly[Ordering[K]]

  val ordered = candidates.sortBy(_._1)

  val numCandidates = ordered.size

  val sumWeights = ordered.map(_._2.toDouble).sum

  val step = sumWeights / partitions

  var cumWeight =
0.0

  var target = step

  val bounds = ArrayBuffer.empty[K]

  var i =
0

  var j =
0

  var previousBound = Option.empty[K]

  while ((i < numCandidates) && (j < partitions -1)) {

    val (key, weight) = ordered(i)

    cumWeight += weight

    if (cumWeight > target) {//根据weight值来均衡划分分界
      // Skip duplicate values.

      if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {

        bounds += key

        target += step

        j += 1

        previousBound = Some(key)

      }

    }

    i += 1

  }

  bounds.toArray

}

其划分边界的具体流程如下:



因此执行RDD的sortbykey操作,会导致对其key的至少一次扫描,比较耗时间,对外表现就是会执行一次action操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark RDD算子