您的位置:首页 > 其它

spark学习2-join

2015-10-13 11:00 344 查看

继续上一篇学习spark

本次将介绍如何用spark进行join的操作,首先join是分map side join和reduce side join,下面将分别介绍下这2个操作在spark中如何进行

map side join

其实就是将较小的表放入到内存,利用spark的广播机制broadcast出去,这样就缓存在内存中,直接上代码
val conf = new SparkConf()
.setAppName("map side join")
.set("spark.executor.memory", args(3))
val sc = new SparkContext(conf)

val card = sc.textFile(args(0))
val info = sc.textFile(args(1)).map { line =>
val field = line.split("\t")
val card = field(0)
val mid = field(1)
val count = field(2)
(card, (mid, count))
}

// 将card的mapping 广播出去
val cardMapping = sc.broadcast(card.map(x => (x, x)).collectAsMap())

val result = info.mapPartitions { iter =>
val m = cardMapping.value
for ((key, value) <- iter if (m.contains(key))) yield (key, value._1, value._2)
}
result.saveAsTextFile(args(2))

解释下mapPartitions:


mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。

它的函数定义为:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]


f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:
scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next while (iter.hasNext) {
val cur = iter.next;
res .::= (pre, cur) pre = cur;
}
res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))


上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。

mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

reduce side join

这个就是一个很正常的join,直接用spark提供的算子操作join方法就可以,比较简单,不多做介绍了
可以多看看spark其实有很多个join,包含了left outer join、right outer join等,hive有的操作这个都有了~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: