Spark源码阅读笔记(RDD)(一)
2015-07-05 14:36
489 查看
Spark源码阅读笔记(RDD)(一)
RDD(Resilient Distributed Dataset,弹性分布式数据集)是spark最基本的抽象,本质上是一个不可变的集合。该集合上的元素被划分到不同的分区,分区是存取的基本单元,被存储在集群或本地机器的内存或磁盘中,作用在RDD上的函数可以在不同的分区上并行计算。A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
RDD构造
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] )
def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))
RDD属性
_sc类型:SparkContext
描述:spark应用程序上下文信息
deps
类型:Seq[Dependency[_]]
描述:该RDD依赖的RDD
partitioner
类型:Partitioner
描述:键值对RDD(key-value pair RDD)中键的分区算法,有HashPartitioner,CustomPartitioner,RangePartitioner等
id
类型:Int
描述:RDD的id,递增,由AtomicInteger.getAndIncrement()生成
RDD属性之依赖(Dependency)
RDD有两种类型的依赖:窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。窄依赖表示子RDD的每个分区只依赖很少数量的父RDD的分区,窄依赖使计算可以以流式的方式执行。窄依赖本质上是指父RDD转换为子RDD时不需要经过洗牌(shulff)的过程。
Each partition of the child RDD depends on a small number
of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
宽依赖(其实应该为shulff依赖,ShuffleDependency)表示父RDD需要经过洗牌(shulff)才能转换为子RDD的依赖,即子RDD中的每个分区依赖父RDD的所有分区。
Represents a dependency on the output of a shuffle stage.
依赖(Dependency)的抽象类
/** * Base class for dependencies. */ @DeveloperApi abstract class Dependency[T] extends Serializable { def rdd: RDD[T] }
窄依赖(NarrowDependency)抽象类
/** * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }
窄依赖有许多实现:OneToOneDependency,RangeDependency,PruneDependency等
OneToOneDependency:表示子RDD分区和父RDD分区一一对应的依赖关系
/** * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) }
RangeDependency:表示子RDD和父RDD的分区在特定的区间一一对应的依赖关系,[outStart,outStart+length] <==> [inStart,inStart+length]
/** * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
宽依赖(ShuffleDependency)
/** * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None, * the default serializer, as specified by `spark.serializer` config option, will * be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */ @DeveloperApi class ShuffleDependency[K, V, C]( @transient _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]] val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }
RDD属性之Partitioner
Partitioner表示键值对RDD(key-value pair RDD)中根据键(key)计算分区值的算法。该算法在洗牌(shulff)阶段需要用到。An object that defines how the elements in a key-value pair RDD are partitioned by key. Maps each key to a partition ID, from 0 to
numPartitions - 1.
Partitioner抽象类
/** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. */ abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int }
Partitioner有许多实现:HashPartitioner,CustomPartitioner,RangePartitioner等
HashPartitioner实现
/** * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will * produce an unexpected or incorrect result. */ class HashPartitioner(partitions: Int) extends Partitioner { def numPartitions = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }
相关文章推荐
- emacs 搭建racket开发环境
- 汇编实验一 查看CPU和内存,用机器指令和汇编指令编程
- 解析JS运动
- ACM题目之颠倒的价牌
- HDU 1026 Ignatius and the Princess I(bfs +记录路径)
- java JTextArea的简单实例
- 常用http状态码
- UIColor和UIImage转换
- 替身杀手——写进37生日
- iOS UICollectionView基础
- HDU 2028 Lowest Common Multiple Plus
- cocos2D-X从的源代码的分析cocos2D-X学习OpenGL(1)----cocos2D-X渲染架构
- 几个比较好的latext简历模板网站
- Android项目中使用自定义进度加载Dialog
- 删除表空间时,遇到了ORA-14404错误
- 多线程同步的使用
- Eclipse中应用的调试
- iOS应用开发之地图位置解析
- 设计模式总结
- 宝更容易使用比读IC卡信息的工具