spark几个重要的概念区分
2018-04-03 21:56
387 查看
1、RDD和DStream的区别
RDD:弹性数据集,其中包含了多个partition,每个子集partition可以分布在不同节点上,在进行处理时分别在不同机器上进行处理;
DStream:
对数据流按时间切分出来的一小批次,每个DStream对应多个RDD,这些RDD是按照时间维度进行划分的,
关系:相当于一整条数据流DStream被切分成了多个RDD,每个DStream对应多个RDD;一个RDD对应多个partition
2、foreach与foreachPartition的区别
foreach:源码:def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
获得每个partition中数据iterator 然后在遍历这个迭代器时对其中的每条数据进行入参function操作
foreachPartition:源码:
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))}
和foreach一样都是先获取partition中的数据iterator,不同的是在此方法中没有对iterator进行遍历,而是把这个iterator交给了入参function处理
总结:都是对partition中的数据进行操作,foreach是对每条信息,foreachpartition操作的是整个partition的iterator
3、map、mapPartitions和mapPartitionsWithIndex区别
map源码:def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
这个方法的的入参函数的参数是rdd中的具体一条信息,这样操作会不停的创建对象,例如:
如果需要把结果写到Mysql中,哪来一条记录就得生成一个连接
mapPartitions源码:
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning)}
这个方法的的入参函数的参数是一个迭代器,对于一个RDD的每个分区进行操作,这样就可以对一个分区只建立一个连接
mapPartitionsWithIndex源码:
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), preservesPartitioning)}
这个方法与mapPartition类似;只不过入参的函数的参数有两个:一个是分区号,一个是迭代器
4、Spark的高级排序和Spark的高级分区
排序:spark中的排序是通过隐式转换实现的,源码:
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)]) : OrderedRDDFunctions[K, V, (K, V)] = { new OrderedRDDFunctions[K, V, (K, V)](rdd)}
其中OrderedRDDFunctions是SortByKey所在的类
再看sortBykey源码:
private val ordering = implicitly[Ordering[K]] /** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). **/ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{ val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse)}
在转换调用sortByKey方法时,会从上下文中提取Ordering[K]
private val ordering = implicitly[Ordering[K]]
其中
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
这里的k:Ordering意思是指k必须是可以转换成ordering的子类
所以我们有两种方式
1、在定义K的排序时可以使用隐式值
implicit val po: Ordering[Monkey] = new Ordering[Monkey] { override def compare(x: Monkey, y: Monkey): Int = { x.name.compareTo(y.name) }} ssc.textFileStream("").foreachRDD(rdd => rdd.map(msg => (Monkey(msg), msg)).sortByKey())
2、key类实现Ordered[k]接口中的compare方法
case class Monkey(name:String) extends Ordered[Monkey]{ override def compare(that: Monkey): Int = { this.name.compare(that.name) } def run(): Unit ={ println("monkey run fast") } } private def test() { //imp 4000 licit val po: Ordering[Monkey] = new Ordering[Monkey] { ssc.textFileStream("").foreachRDD(rdd => rdd.map(msg => (Monkey(msg), msg)).sortByKey()) }
相关文章推荐
- Spark里几个重要的概念及术语
- Spark里几个重要的概念及术语
- Spark里几个重要的概念及术语
- Spark里几个重要的概念及术语
- Spark里几个重要的概念及术语
- IntelliJ IDEA 项目相关的几个重要概念介绍
- 关于MIPI DSI几个重要的概念
- Maven中的几个重要概念(二):lifecycle, phase and goal
- 机器学习几个重要概念
- HTTP协议的几个重要概念
- Maven中的几个重要概念(二):lifecycle, phase and goal
- LoadRunner几个重要的概念:事务、集合点、思考时间
- SIP协议入门:初学者必须明白的几个重要概念
- object-c 要理解协议的几个重要概念
- Dicom要点整理三:C-Find/C-Move/C-Store 几个重要概念
- J2ME平台中有几个重要的概念
- 区分几个概念
- WPF重要的几个概念:
- Spring中几个重要的概念
- 关于几个概念的区分:终端、shell、控制台!