您的位置:首页 > 其它

Spark源码学习(1)——RDD分析

2016-06-09 23:58 1286 查看

Spark源码学习(1)——RDD分析

本文要解决的问题:

从更深层次理解Spark的RDD

学习Spark的时候,我们可以从Spark的核心内容看起,即RDD。RDD全称Resilient Distributed DataSets,弹性的分布式数据集。RDD是只读的,不可变的数据集,也拥有很好的容错机制。他有5个主要特性

(1)A list of partitions :分片列表,数据能被切分才能做并行计算。

(2)A function for computing each split: 一个函数计算一个分片。

(3)A list of dependencies on other RDDs :对其他RDD的依赖列表。

(4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)RDD 可选的,key-value型的RDD,根据hash来分区。

(5)Optionally, a list of preferred locations to compute each split on (e.g. blocklocations for an HDFS file) 可选的,每一个分片的最佳计算位置。

RDD是Spark所有组件运行的底层系统,RDD是一个容错的,并行的数据结构,它提供了丰富的数据操作和API接口。

Spark中的RDD API



一个RDD可以包含多个分区。每个分区都是一个dataset片段。RDD之间可以相互依赖

窄依赖:一一对应的关系,一个RDD分区只能被一个子RDD的分区使用的关系

宽依赖:一多对应关系,若多个子RDD分区都依赖同一个父RDD分区

如下RDD图览:



在源码Spark.rdd.RDD中有一些比较中的方法:

(1)protected def getPartitions: Array[Partition]

/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
*/子类实现返回一组分区在这个RDD。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。

protected def getPartitions: Array[Partition]


这个方法返回多个partition,存放在一个数字中。

(2)protected def getDependencies: Seq[Dependency[_]] = deps

/**

* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
* 子类实现返回这个RDD如何取决于父RDDS。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。
*
*/
protected def getDependencies: Seq[Dependency[_]] = deps


它返回一个依赖关系的Seq集合。

(3)def compute(split: Partition, context: TaskContext): Iterator[T]

/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
* 子类实现的计算一个给定的分区。
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]


每个RDD都有一个对应的具体计算函数。

(4)protected def getPreferredLocations(split: Partition): Seq[String] = Nil

/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil


获取partition的首选位置,这是分区策略。

RDD Transformations and action

RDD数据操作主要有两个动作:

1.Transformations(转换)

Transformations中包含的主要方法
(1)map(f : T ) U) : RDD[T] ) RDD[U]
(2)filter(f : T ) Bool) : RDD[T] ) RDD[T]
(3)flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U]
(4)sample(fraction : Float) : RDD[T] ) RDD[T]
(5)groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]
(6)reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]
(7)union() : (RDD[T]; RDD[T]) ) RDD[T]
(8)join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]
(9)cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))]
(10)crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)]
(11)mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)
(12)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]
(13)partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]
2.Action(动作)

Action中包含的主要方法
(1)count() : RDD[T] ) Long
(2)collect() : RDD[T] ) Seq[T]
(3)reduce(f : (T; T) ) T) : RDD[T] ) T
(4)lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)
(5)save(path : String) : Outputs RDD to a storage system, e.g., HDFS
先看下Transformations部分

// Transformations (return a new RDD)
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

/**
*  Return a new RDD by first applying a function to all elements of this
*  RDD, and then flattening the results.
*/

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f))
/**

* Return a new RDD containing only the elements that satisfy a predicate.

*/

def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))

......


(1)Map

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))


返回一个MappedRDD,它继承RDD并重写了两个方法getPartitions、compute。

第一个方法getPartitions,他获取第一个父RDD,并获取分片数组。

override def getPartitions: Array[Partition] = firstParent[T].partitions


第二个方法compute,将根据map参数内容来遍历RDD分区。

override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)


(2)Filter

/**

* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))


Filter是一个过滤操作,比如mapRDD.filter(_ >1)。

(3)Union

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))


多个RDD组成成一个新RDD,它重写了RDD的5个方法getPartitions、getDependencies、compute、getPreferredLocations、clearDependencies

从getPartitions、getDependencies中可以看出它应该是一组宽依赖关系

override def getDependencies: Seq[Dependency[_]] = {
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)
pos += rdd.partitions.size
}
deps
}


(4)groupBy

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this))


根据参数分组,这又产生了一个新的RDD。

再看下Action部分

(1)Count

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum


跟踪代码,在runJob方法中调用了dagScheduler.runJob。而在DAGScheduler,将提交到作业调度器,并获得JobWaiter对象返回。该JobWaiter对象可以用来阻塞,直到任务完成执行或可以用来取消作业。

RDD中的任务调度



网上找的任务调度图片

从这个图中,可以看到:

RDD Object产生DAG,然后进入DAGScheduler阶段:

1、DAGScheduler是面向Stage的高层次调度器,DAGScheduler会将DAG拆分成很多个 tasks,而一组tasks就是图中的stage。

2、每一次shuffle的过程就会产生一个新的stage。DAGScheduler会有RDD记录磁盘的物· 理化操作,为了获得最有tasks,DAGSchulder会先查找本地tasks。

3、DAGScheduler还要监控shuffle产生的失败任务,如果还得重启。

DAGScheduler划分stage后,会以TaskSet为单位把任务提交给TaskScheduler:

1、一个TaskScheduler只为一个sparkConext服务。

2、当接收到TaskSet后,它会把任务提交给Worker节点的Executor中去运行。失败的任务

由TaskScheduler监控重启。

Executor是以多线程的方式运行,每个线程都负责一个任务。

接下来跟踪一个spark提供的例子源码:

源码Spark.examples.SparkPi

def main(args: Array[String]) {
//设置一个应用名称(用于在Web UI中显示)
val conf = new SparkConf().setAppName("Spark Pi")
//实例化一个SparkContext
val spark = new SparkContext(conf)
//转成数据
val slices = if (args.length > 0) args(0).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}


代码中的parallelize是一个并行化的延迟加载,跟踪源码

/** Distribute a local Scala collection to form an RDD.
*  从RDD中分配一个本地的scala集合
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
* altered after the call to parallelize and before the first action on the
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
* the argument to avoid this.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}


它调用了RDD中的map,上面说过的map是一个转换过程,将生成一个新的RDD。最后reduce。

至此,对于Spark源码的RDD部分做了一个简单的学习,下次会对Spark Submit进行学习。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: