您的位置:首页 > 其它

Spark算子之cogroup、cartesian 、intersection 、sortBy

2018-02-06 15:31 453 查看
package com.lyzx.day36

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

class T1 {
/**
* join vs cogroup
* join是做类似于SQL的inner join 匹配到的按照排列组合的方式列出来
* cogroup 则是现在本RDD上做一次聚合把键相同的值们都放在一个CompactBuffer中,
*         然后再在参数RDD上做一次聚合,在把两者做一次就和
*         例如r1.cogroup(r2)
*         (key,(CompactBuffer(r1上的值),CompactBuffer(r2上的值)))
*         注意:r1中没有的值也会出现在结果集中
*         (key,(CompactBuffer(),CompactBuffer(r2上的值)))
* @param sc
*/
def f1(sc:SparkContext): Unit ={
val r1 = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,3),(4,4))).cache()
val r2 = sc.parallelize(List((1,98),(1,99),(2,100),(3,300),(7,7))).cache()

r1.join(r2)
.foreach(x=>println("join:"+x))

r1.cogroup(r2)
.foreach(x=>println("cogroup:"+x))
}

/**
* cartesian 笛卡尔积就是排列组合
* 返回值是二元组
* @param sc
*/
def f2(sc:SparkContext): Unit ={
val r1 = sc.parallelize(Array("大大的","长长的","混圆的","丰满的"))
val r2 = sc.parallelize(Array("菠萝","脑袋","黄瓜","屁股"))

r1.cartesian(r2)
.foreach(x=>println(x._1+"-"+x._2))
}

/**
* intersection 求交集
* @param sc
*/
def f3(sc:SparkContext): Unit ={
val r1 = sc.parallelize(1 to 20)
val r2 = sc.parallelize(10 to 25)

r1.intersection(r2)
.foreach(x=>println("::"+x))
}

/**
* sortBy按照某某方式排序
* @param sc
*/
def f4(sc:SparkContext): Unit ={
val r1 = sc.parallelize(Array(10,20,99,4,2,1)).persist(StorageLevel.MEMORY_AND_DISK_SER)
r1.foreach(x=>println(":"+x))
r1.sortBy(x=>x)
.foreach(x=>println("::"+x))
}

/**
* SparkContext.textFile(path:String,[minPartitions:Int])
* path:指文件的路径
* 对于partition的
*/

}

object T1{

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("day36").setMaster("local")
val sc = new SparkContext(conf)
val t = new T1

//    t.f1(sc)
//    t.f2(sc)
//  t.f3(sc)
t.f4(sc)

sc.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: