spark sortByKey subtractByKey take takeOrdered等函数使用例子
2015-01-20 22:48
531 查看
package com.latrobe.spark import org.apache.spark.{SparkContext, SparkConf} /** * Created by spark on 15-1-19. * 根据key对K-V类型的RDD进行排序获得新的RDD */ object SortByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","cat","owl","gnu","ant")) val b = sc.parallelize(1 to a.count().toInt) val c = a.zip(b) //asc c.sortByKey(true).collect().foreach(print) //desc c.sortByKey(false).collect().foreach(print) } } /** * Created by spark on 15-1-19. * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */ object Subtract { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10) val b = sc.parallelize(1 to 3) //45678910 //a.subtract(b).collect().foreach(print) val c = sc.parallelize(1 to 10) val d = sc.parallelize(List(1,2,3,11)) //45678910 c.subtract(d).collect().foreach(print) } } /** * Created by spark on 15-1-19. * RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 key中存在的,RDD2 key中不存在的 */ object SubtractByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","he","word","hello")) val b = a.keyBy(_.length) val c = sc.parallelize(List("cat","first","everyone")) val d = c.keyBy(_.length) //(2,he)(4,word) b.subtractByKey(d).collect().foreach(print) } } /** * Created by spark on 15-1-19. * sumApprox没有出现我希望的结果 */ object SumAndSumApprox { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) val b = a.sum() val c = a.sumApprox(0L,0.9).getFinalValue() println(b + " *** " + c) } } /** * Created by spark on 15-1-19. * 取出RDD的前n个元素,以数组的形式返回 */ object Take { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) //12345678910 a.take(10).foreach(print) } } /** * Created by spark on 15-1-19. * 对RDD元素进行升序排序 * 取出前n个元素并以数组的形式放回 */ object TakeOrdered { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(List("ff","aa","dd","cc")) //aacc a.takeOrdered(2).foreach(print) } } /** * Created by spark on 15-1-19. * 数据取样 */ object TakeSample { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10000) /** * 9048 5358 5216 7301 6303 6179 6151 5304 8115 3869 */ a.takeSample(true , 10 , 1).foreach(println) } } /** * Created by spark on 15-1-19. * debug 详情信息显示 */ object ToDebugString { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 9) val b = sc.parallelize(1 to 3) val c = a.subtract(b) c.toDebugString } } /** * Created by spark on 15-1-19. * 获得前几个最大值 */ object Top { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000) val c = a.top(10) /** *1000 999 998 997 996 995 994 993 992 991 */ c.foreach(println) } } /** * Union == ++ 把两个RDD合并为一个新的RDD */ object Union { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 3) val b = sc.parallelize(3 to 5) val c = a.union(b) val d = a ++ b /** *123345 */ c.collect().foreach(print) /** *123345 */ d.collect().foreach(print) } }
相关文章推荐
- AcceptEx函数与完成端口的结合使用例子
- 函数对象 与 count_if()结合使用 例子
- LINUX中使用PTHREAD_KILL函数测试线程是否存活的例子
- linux中使用pthread_kill函数测试线程是否存活的例子
- linux中使用pthread_kill函数测试线程是否存活的例子
- CodeProject - 在C#使用SHGetFileInfo获取(管理)文件或者文件夹图标(C#封装Win32函数的一个例子)
- windows中目录字符串处理函数, MSDN里有使用的例子
- jQuery扩展插件和拓展函数的写法(匿名函数使用的典型例子)
- Flex中如何使用addChild()和removeChild()函数动态添加或删除Accordion容器中项目的例子
- VC++ 的类中使用函数指针的小例子,通过函数指针调用不同的函数
- pl/sql中使用table()函数的例子
- 使用泛型算法的例子, 使用了函数对象
- 一些函数使用的例子
- jQuery下扩展插件和拓展函数的写法(匿名函数使用的典型例子)
- what's in string? c语言string类函数实现汇总 都是学习使用指针的好例子啊(算是读书摘抄和笔记吧)
- Event事件函数的使用例子
- CodeProject - 在C#使用SHGetFileInfo获取(管理)文件或者文件夹图标(C#封装Win32函数的一个例子)
- ExtractStrings函数使用例子
- 转发屠夫大牛使用未公开API ZwQueryVirtualMemory 枚举进程模块的函数例子
- 函数指针的使用例子