您的位置:首页 > 大数据

spark几个重要的概念区分

2018-04-03 21:56 387 查看

1、RDD和DStream的区别

RDD:

弹性数据集,其中包含了多个partition,每个子集partition可以分布在不同节点上,在进行处理时分别在不同机器上进行处理;

DStream:

对数据流按时间切分出来的一小批次,每个DStream对应多个RDD,这些RDD是按照时间维度进行划分的,

关系:相当于一整条数据流DStream被切分成了多个RDD,每个DStream对应多个RDD;一个RDD对应多个partition

2、foreach与foreachPartition的区别

foreach:源码:

def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}


获得每个partition中数据iterator 然后在遍历这个迭代器时对其中的每条数据进行入参function操作

foreachPartition:源码:

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))}


和foreach一样都是先获取partition中的数据iterator,不同的是在此方法中没有对iterator进行遍历,而是把这个iterator交给了入参function处理

总结:都是对partition中的数据进行操作,foreach是对每条信息,foreachpartition操作的是整个partition的iterator

3、map、mapPartitions和mapPartitionsWithIndex区别

map源码:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}


这个方法的的入参函数的参数是rdd中的具体一条信息,这样操作会不停的创建对象,例如:

如果需要把结果写到Mysql中,哪来一条记录就得生成一个连接

mapPartitions源码:

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)}


这个方法的的入参函数的参数是一个迭代器,对于一个RDD的每个分区进行操作,这样就可以对一个分区只建立一个连接

mapPartitionsWithIndex源码:

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)}


这个方法与mapPartition类似;只不过入参的函数的参数有两个:一个是分区号,一个是迭代器

4、Spark的高级排序和Spark的高级分区

排序:

spark中的排序是通过隐式转换实现的,源码:

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag](rdd: RDD[(K, V)])
: OrderedRDDFunctions[K, V, (K, V)] = {
new OrderedRDDFunctions[K, V, (K, V)](rdd)}


其中OrderedRDDFunctions是SortByKey所在的类

再看sortBykey源码:

private val ordering = implicitly[Ordering[K]]
/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
**/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)}


在转换调用sortByKey方法时,会从上下文中提取Ordering[K]

private val ordering = implicitly[Ordering[K]]


其中

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])


这里的k:Ordering意思是指k必须是可以转换成ordering的子类

所以我们有两种方式

1、在定义K的排序时可以使用隐式值

implicit val po: Ordering[Monkey] = new Ordering[Monkey] {
override def compare(x: Monkey, y: Monkey): Int = {
x.name.compareTo(y.name)
}}

ssc.textFileStream("").foreachRDD(rdd => rdd.map(msg => (Monkey(msg), msg)).sortByKey())


2、key类实现Ordered[k]接口中的compare方法

case class Monkey(name:String) extends Ordered[Monkey]{
override def compare(that: Monkey): Int = {
this.name.compare(that.name)
}
def run(): Unit ={
println("monkey run fast")
}
}
private def test() {
//imp
4000
licit val po: Ordering[Monkey] = new Ordering[Monkey] {
ssc.textFileStream("").foreachRDD(rdd => rdd.map(msg => (Monkey(msg), msg)).sortByKey())
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息