Spark RDD API 参考示例(二)
2018-02-05 13:17
465 查看
16、dependencies
原型 final def dependencies: Seq[Dependency[_]]
含义
dependencies 返回RDD的依赖,简单来说,就是这个RDD是怎么一步步生成的。通过这种方式可以很快的重新构建这个RDD
示例
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b.dependencieses res0: Seq[org.apache.spark.Dependency[_]] = List() //表示目前b还没有依赖关系 val c=b.map(a=>a) c.dependencieses res1: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@73041dc1) //表示目前c存在一个依赖关系 val d = c.cartesian(a) d.dependencies res2: List(org.apache.spark.rdd.CartesianRDD$$anon$1@2e981e6a, org.apache.spark.rdd.CartesianRDD$$anon$2@5911fad1) //这里表示d存在两个依赖,通过这种方式,可以很快的恢复其构建顺序
17、distinct
原型 def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
含义
distinct 返回一个只包含唯一键的新的RDD,简单来说,就是去重复
示例
//用于去重复 val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) //没有带分区数,那么分区数保持为2 c.distinct.collect res1: Array[String] = Array(Dog, Gnu, Cat, Rat) //去重复时,还可以指定所用的分区数量,提高并行效率 val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) //将取重复的结果分为了三个分区 a.distinct(3).collect res2: Array[Int] = Array(6, 3, 9, 4, 1, 7, 10, 8, 5, 2)
18、first
原型 def first(): T
含义
first 查找RDD中第一个元素,可以是单个元素,也可以是一个元组
示例
//查找单个元素 val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.first res1: String = Gnu //查找元组 val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) val d = sc.parallelize(List(1,2,3,4), 2) val e = c.zip(d) e.first res2: (String, Int) = (Gnu,1)
19、filter
原型 def filter(f: T => Boolean): RDD[T]
含义
filter 筛选使得filter中的函数为真的值,也就是筛选满足条件的值,并将这些值放入到RDD中
示例
//筛选满足模2为0的值 val a = sc.parallelize(1 to 10, 3) val b = a.filter(_ % 2 == 0) b.collect res1: Array[Int] = Array(2, 4, 6, 8, 10) //当使用自定义的筛选函数时,这个筛选函数必须要能够处理RDD中所有的元素 val b = sc.parallelize(1 to 8) b.filter(_ < 4).collect res2: Array[Int] = Array(1, 2, 3) //如果不能处理所有元素,就会报错 val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.filter(_ < 4).collect <console>:3: error: value < is not a member of Any //如果需要对不同类型的数据处理,可以自定义混合处理函数,例如使用case语句
20、filterByRange
原型 def filterByRange(lower: K, upper: K): RDD[P]
含义
filterByRange 只适合筛选key-value类型的键值对,filterByRange 参数中的范围只对key起作用,通过key来筛选。
示例
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3) //先排序,这样筛选的速度快一些 val sortedRDD = randRDD.sortByKey() //筛选key在1-3范围内的元组 sortedRDD.filterByRange(1, 3).collect res1: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))
21、flatMap
原型 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
含义
flatMap 和map 功能类似,但是他是将每个元素中的数据拆分来处理,而map 是将其作为一个整体来处理,也就是flatMap 的操作粒度更小。如果一个数据是一行文本,我们要统计单词,就可以使用它指定切分方式,来统计单词个数,这样的功能Map 不方便实现
示例
val z = sc.parallelize(List("cat","dog","gnu"),2) z.flatMap(i=>i).collect //flatMap每次将元素的值,会进行再次切分 res1: Array[Char] = Array(c, a, t, d, o, g, g, n, u) //map每次将一个元素作为处理单位 z.map(i=>i).collect res2: Array[String] = Array(cat, dog, gnu) sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect res3: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
22、flatMapValues
原型 def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
含义
flatMapValues 和mapValues 功能类似,每次以value 作为切分单位,切分方式和 flatMap 类似,默认以单个字符作为切分单位
示例
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) //以数字作为key val b = a.map(x => (x.length, x)) b.flatMapValues("x" + _ + "x").collect res1: Array[(Int, Char)] = Array((3,x), (3,d), (3,o), (3,g), (3,x), (5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), (4,x), (4,l), (4,i), (4,o), (4,n), (4,x), (3,x), (3,c), (3,a), (3,t), (3,x), (7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x), (5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x))
23、fold
原型 def fold(zeroValue: T)(op: (T, T) => T): T
含义
fold 是 aggregate 的简化版,他只能传递一个函数,分别作用于每个分区内部以及分区之间
示例
val a = sc.parallelize(List(1,2,3), 3) a.fold(1)(_ + _) res1: Int = 6 val a = sc.parallelize(List("c","b","abc"), 3) a.fold("x")(_ + _) res2: String = xxcxbxabc
24、foldByKey [Pair]
原型 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
含义
foldByKey 和 fold 类似,他只能传递一个函数,分别作用于每个分区内部以及分区之间 。同时是aggregateByKey 的一个简化版,功能和他类似
示例
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) //每次按照key来聚合,分区内和分区之间的聚合函数相同 b.foldByKey("")(_ + _).collect res1: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
25、foreach
原型 def foreach(f: T => Unit)
含义
foreach 对RDD中的每个元素执行一个特定的功能,这个元素可以是元组,也可以是单个元素。也就是对RDD中每一项进行操作
示例
//这个目前没有实践,spark-shell中不支持 val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "spider"), 3) c.foreach(x => println(x + "s are yummy")) //通常用于下一步操作,可以将其赋值给变量
26、foreachPartition
原型 def foreachPartition(f: Iterator[T] => Unit)
含义
foreachPartition 对RDD中的每个分区作为一个整体,每次操作的是一个分区
示例
//这个目前没有实践,spark-shell中不支持 val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) //x 表示一个分区,每次对一个分区进行操作 b.foreachPartition(x => println(x.reduce(_ + _))) //通常用于下一步操作,可以将其赋值给变量
27、fullOuterJoin [Pair]
原型 def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
含义
fullOuterJoin 类似于数据库中的外连接,一共有 n×m 行,每个元素都要出现。按照key进行分组。
示例
val pairRDD1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("book", 4),("gnu", 12))) val pairRDD2 = sc.parallelize(List( ("cat",2), ("gnu", 5), ("mouse", 4),("cat", 12))) pairRDD1.fullOuterJoin(pairRDD2).collect //pairRDD1中的每个元素都会于 m 个 pairRDD2 个元素连接,形成一个 n×m 行的数据 res0: Array[(String, (Option[Int], Option[Int]))] = Array((gnu,(Some(12),Some(5))), (cat,(Some(2),Some(12))), (cat,(Some(2),Some(2))), (cat,(Some(5),Some(12))), (cat,(Some(5),Some(2))), (book,(Some(4),None)), (mouse,(None,Some(4))))
相关文章推荐
- Spark RDD API 参考示例(二)
- Spark RDD API 参考示例(四)
- Spark RDD API 参考示例(一)
- Spark RDD API 参考示例(三)
- Spark RDD API 参考示例(五)
- Spark RDD API 参考示例(六)
- Spark RDD API 参考示例(四)
- Spark RDD API 参考示例(五)
- Spark RDD API详解(一) Map和Reduce
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark RDD API详解(一) Map和Reduce
- Spark RDD API详解(一) Map和Reduce
- Spark RDD API具体解释(一) Map和Reduce
- Spark算子-RDD Action(saveAsNewAPIHadoopFile)
- Spark笔记:复杂RDD的API的理解(下)
- Spark源码核心与开发实战---Spark RDD与Spark API编程实例
- Spark 2.0介绍:从RDD API迁移到DataSet API
- Spark RDD API详解之Map和Reduce
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)