Spark源码学习(1)——RDD分析
2016-06-09 23:58
1286 查看
Spark源码学习(1)——RDD分析
本文要解决的问题:从更深层次理解Spark的RDD
学习Spark的时候,我们可以从Spark的核心内容看起,即RDD。RDD全称Resilient Distributed DataSets,弹性的分布式数据集。RDD是只读的,不可变的数据集,也拥有很好的容错机制。他有5个主要特性
(1)A list of partitions :分片列表,数据能被切分才能做并行计算。
(2)A function for computing each split: 一个函数计算一个分片。
(3)A list of dependencies on other RDDs :对其他RDD的依赖列表。
(4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)RDD 可选的,key-value型的RDD,根据hash来分区。
(5)Optionally, a list of preferred locations to compute each split on (e.g. blocklocations for an HDFS file) 可选的,每一个分片的最佳计算位置。
RDD是Spark所有组件运行的底层系统,RDD是一个容错的,并行的数据结构,它提供了丰富的数据操作和API接口。
Spark中的RDD API
一个RDD可以包含多个分区。每个分区都是一个dataset片段。RDD之间可以相互依赖
窄依赖:一一对应的关系,一个RDD分区只能被一个子RDD的分区使用的关系
宽依赖:一多对应关系,若多个子RDD分区都依赖同一个父RDD分区
如下RDD图览:
在源码Spark.rdd.RDD中有一些比较中的方法:
(1)protected def getPartitions: Array[Partition]
/** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * */子类实现返回一组分区在这个RDD。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。 protected def getPartitions: Array[Partition]
这个方法返回多个partition,存放在一个数字中。
(2)protected def getDependencies: Seq[Dependency[_]] = deps
/** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * 子类实现返回这个RDD如何取决于父RDDS。这种方法将只被调用一次,因此它是安全的,它来实现一个耗时的计算。 * */ protected def getDependencies: Seq[Dependency[_]] = deps
它返回一个依赖关系的Seq集合。
(3)def compute(split: Partition, context: TaskContext): Iterator[T]
/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. * 子类实现的计算一个给定的分区。 */ @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T]
每个RDD都有一个对应的具体计算函数。
(4)protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil
获取partition的首选位置,这是分区策略。
RDD Transformations and action
RDD数据操作主要有两个动作:1.Transformations(转换)
Transformations中包含的主要方法 |
---|
(1)map(f : T ) U) : RDD[T] ) RDD[U] |
(2)filter(f : T ) Bool) : RDD[T] ) RDD[T] |
(3)flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U] |
(4)sample(fraction : Float) : RDD[T] ) RDD[T] |
(5)groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])] |
(6)reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)] |
(7)union() : (RDD[T]; RDD[T]) ) RDD[T] |
(8)join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))] |
(9)cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))] |
(10)crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)] |
(11)mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning) |
(12)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)] |
(13)partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)] |
Action中包含的主要方法 |
---|
(1)count() : RDD[T] ) Long |
(2)collect() : RDD[T] ) Seq[T] |
(3)reduce(f : (T; T) ) T) : RDD[T] ) T |
(4)lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs) |
(5)save(path : String) : Outputs RDD to a storage system, e.g., HDFS |
// Transformations (return a new RDD) /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) /** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) ......
(1)Map
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
返回一个MappedRDD,它继承RDD并重写了两个方法getPartitions、compute。
第一个方法getPartitions,他获取第一个父RDD,并获取分片数组。
override def getPartitions: Array[Partition] = firstParent[T].partitions
第二个方法compute,将根据map参数内容来遍历RDD分区。
override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)
(2)Filter
/** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
Filter是一个过滤操作,比如mapRDD.filter(_ >1)。
(3)Union
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
多个RDD组成成一个新RDD,它重写了RDD的5个方法getPartitions、getDependencies、compute、getPreferredLocations、clearDependencies
从getPartitions、getDependencies中可以看出它应该是一组宽依赖关系。
override def getDependencies: Seq[Dependency[_]] = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size) pos += rdd.partitions.size } deps }
(4)groupBy
/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this))
根据参数分组,这又产生了一个新的RDD。
再看下Action部分
(1)Count
/** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
跟踪代码,在runJob方法中调用了dagScheduler.runJob。而在DAGScheduler,将提交到作业调度器,并获得JobWaiter对象返回。该JobWaiter对象可以用来阻塞,直到任务完成执行或可以用来取消作业。
RDD中的任务调度
网上找的任务调度图片
从这个图中,可以看到:
RDD Object产生DAG,然后进入DAGScheduler阶段:
1、DAGScheduler是面向Stage的高层次调度器,DAGScheduler会将DAG拆分成很多个 tasks,而一组tasks就是图中的stage。
2、每一次shuffle的过程就会产生一个新的stage。DAGScheduler会有RDD记录磁盘的物· 理化操作,为了获得最有tasks,DAGSchulder会先查找本地tasks。
3、DAGScheduler还要监控shuffle产生的失败任务,如果还得重启。
DAGScheduler划分stage后,会以TaskSet为单位把任务提交给TaskScheduler:
1、一个TaskScheduler只为一个sparkConext服务。
2、当接收到TaskSet后,它会把任务提交给Worker节点的Executor中去运行。失败的任务
由TaskScheduler监控重启。
Executor是以多线程的方式运行,每个线程都负责一个任务。
接下来跟踪一个spark提供的例子源码:
源码Spark.examples.SparkPi
def main(args: Array[String]) { //设置一个应用名称(用于在Web UI中显示) val conf = new SparkConf().setAppName("Spark Pi") //实例化一个SparkContext val spark = new SparkContext(conf) //转成数据 val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() }
代码中的parallelize是一个并行化的延迟加载,跟踪源码
/** Distribute a local Scala collection to form an RDD. * 从RDD中分配一个本地的scala集合 * @note Parallelize acts lazily. If `seq` is a mutable collection and is * altered after the call to parallelize and before the first action on the * RDD, the resultant RDD will reflect the modified collection. Pass a copy of * the argument to avoid this. */ def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }
它调用了RDD中的map,上面说过的map是一个转换过程,将生成一个新的RDD。最后reduce。
至此,对于Spark源码的RDD部分做了一个简单的学习,下次会对Spark Submit进行学习。
相关文章推荐
- 从零开始搭建Raspberry Pi机器视觉编程环境
- 动态代理设计模式
- Udev raw裸设备 asmLib区别
- Redis学习笔记(三)Redis源码解读
- leetcode 18. 4Sum
- OGG-01027(长事务)记录
- USACO 6.2 最长回文串_枚举_矩形切割_线段树
- 浅谈Web网站架构演变过程
- <Handler>练习
- java中有无参数和返回值的方法
- Codeforces Round #356 (Div. 2) C
- PRVF-4664 PRVF-4657: Found inconsistent name
- SELinux 对nginx访问目录的影响
- Hadoop on Yarn 各组件详细原理
- 虚幻引擎4系列教程3(霜之小刀)(附视频)--游戏是需要逻辑的
- regex pattern in python for parsing html
- constructor and destructor 概述
- Android 轻松实现仿淘宝地区选择
- ORA-27063: skgfospo: number of bytes rea
- 多线程开发(三)-AnsycTask用法与解惑