Spark RDD API 参考示例(四)
2018-02-05 13:25
423 查看
本文参考Zhen He
原型
def map[U: ClassTag](f: T => U): RDD[U]
含义
map 对RDD中的每一个item 应用一个函数,并且返回一个新的RDD
示例
原型
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
含义
mapPartitions 对RDD中每个Partition 调用一次,每个Partition 中的内容使用一个Iterator, 使用迭代器流作为输入,这种迭代流自动转换为RDD类型数据。这种流只会作用于每个分区内部。
示例
原型
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
含义
mapPartitionsWithIndex 和mapPartitions 非常类似,但是他处理的数据带有两个部分,第一个是分区号,第二部分是该分区号内部的数据组成的Iterator 。 对RDD中每个Partition 调用一次,每个Partition 中的内容使用一个Iterator,
使用迭代器流作为输入,这种迭代流自动转换为RDD类型数据。这种流只会作用于每个分区内部。
示例
原型
def mapValues[U](f: V => U): RDD[(K, U)]
含义
mapValues 用于处理key-value类型的RDD,每次处理一个key-value对,mapValues 处理的是key-value中的value,处理完value之后,就会返回一个key-value类型的数据,返回到新RDD中去
示例
原型
def max()(implicit ord: Ordering[T]): T
含义
max 返回RDD中最大的元素,如果是元组,那么返回key最大的元素
示例
原型
def mean(): Double
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
含义
mean 返回RDD中元素的均值,RDD中元素必须是数字类型的,可以是整数,也可以是浮点数
示例
原型
def min()(implicit ord: Ordering[T]): T
含义
min 返回RDD中元素的最小值,如果是元组,那么返回key最小的元素
示例
原型
@transient var name: String
def setName(_name: String)
含义
name 返回用户给RDD标记的名称
示例
原型
final def partitions: Array[Partition]
含义
partitions 获得RDD的分区信息,并将其放入一个数组返回
示例
原型
def cache(): RDD[T]
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]
含义
persist, cache 只是 persist(StorageLevel.MEMORY_ONLY) 的缩写,用于调整RDD的存储级别,一旦存储级别修改,就不能再次修改。
示例
原型
def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String]
含义
pipe 对RDD中的每一个分区执行shell命令操作,
示例
原型
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
含义
randomSplit 把RDD中数据,根据用户指定的权重,随机切分成多个小的RDD,小RDD中元素的个数由权重来确定。在一些特殊情况下,我们需要让随机产生的数据相同,就可以初始化相同的种子。
示例
原型
def reduce(f: (T, T) => T): T
含义
reduce 把RDD中任意两个元素分层合并,根据用户指定的函数进行聚合
示例
原型
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
含义
reduceByKey 先根据相同的 key 进行分组,然后在相同的 key 中进行聚合,聚合形式和 reduce 相同
示例
原型
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
含义
rightOuterJoin 类似于数据中的右外连接,以右边的作为参考,要是左边没有,那么就将其补空。右边没有的,左边有,那么就舍弃。
示例
42、map
原型 def map[U: ClassTag](f: T => U): RDD[U]
含义
map 对RDD中的每一个item 应用一个函数,并且返回一个新的RDD
示例
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) //对每一个 item 应用一个函数 val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
43、mapPartitions
原型 def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
含义
mapPartitions 对RDD中每个Partition 调用一次,每个Partition 中的内容使用一个Iterator, 使用迭代器流作为输入,这种迭代流自动转换为RDD类型数据。这种流只会作用于每个分区内部。
示例
val a = sc.parallelize(1 to 9, 3) //RDD分成三个分区(1,2,3) (4,5,6) (7,8,9) //当iter取1和2时,可以得到(1,2) (2,3) //当iter取3时 //此处采用的是泛型,所以函数名后面要myfunc[T],如果是具体类型,就不需要 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() //iter取3 var pre = iter.next //这个分区内容已经没有来,所以的流已经结束来。 //所以(3,4)就不存在 while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } //作用于每一个分区,每个分区的内容以iterator流的格式输入,分区之间互不影响 a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) //对每个分区进行partition,这是于map的最大的区别 val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) //此处采用的是具体类型,所以不需要在函数名后面加泛型 def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(5))(cur) } res.iterator } x.mapPartitions(myfunc).collect res2: Array(1, 2, 2, 2, 2, 3, 3, 4, 6, 7, 7, 8, 8, 9, 9, 9, 9, 10, 10, 10)
44、mapPartitionsWithIndex
原型 def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
含义
mapPartitionsWithIndex 和mapPartitions 非常类似,但是他处理的数据带有两个部分,第一个是分区号,第二部分是该分区号内部的数据组成的Iterator 。 对RDD中每个Partition 调用一次,每个Partition 中的内容使用一个Iterator,
使用迭代器流作为输入,这种迭代流自动转换为RDD类型数据。这种流只会作用于每个分区内部。
示例
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) //mapPartitionsWithIndex处理函数,需要两个参数,第一个参数是分区号,第二个参数是该分区中数据组成的iterator //处理函数的返回结果必须也是一个iterator def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator } x.mapPartitionsWithIndex(myfunc).collect() res1: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
45、mapValues [Pair]
原型 def mapValues[U](f: V => U): RDD[(K, U)]
含义
mapValues 用于处理key-value类型的RDD,每次处理一个key-value对,mapValues 处理的是key-value中的value,处理完value之后,就会返回一个key-value类型的数据,返回到新RDD中去
示例
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) //每次处理一个key-value中的value,然后附上key就行了 b.mapValues("x" + _ + "x").collect res1: Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
46、max
原型 def max()(implicit ord: Ordering[T]): T
含义
max 返回RDD中最大的元素,如果是元组,那么返回key最大的元素
示例
val y = sc.parallelize(10 to 30) y.max res1: Int = 30 val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat"))) a.max res2: (Int, String) = (18,cat)
47、mean [Double], meanApprox [Double]
原型 def mean(): Double
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
含义
mean 返回RDD中元素的均值,RDD中元素必须是数字类型的,可以是整数,也可以是浮点数
示例
//浮点数使用mean函数 val a = sc.parallelize(List(5.0, 2.0, 2.1, 7.4, 7.5, 7.1, 8.8, 10.0, 8.9, 5.5), 3) a.mean res0: Double = 6.4300000000000015 //整数使用mean函数 val b = sc.parallelize(1 to 10) b.mean res1: Double = 5.5
48、min
原型 def min()(implicit ord: Ordering[T]): T
含义
min 返回RDD中元素的最小值,如果是元组,那么返回key最小的元素
示例
val y = sc.parallelize(10 to 30) y.min res1: Int = 10 //元组返回最小的key的元素 val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (8, "cat"))) a.min res2: (Int, String) = (3,tiger)
49、name, setName
原型 @transient var name: String
def setName(_name: String)
含义
name 返回用户给RDD标记的名称
示例
val y = sc.parallelize(1 to 10, 10) //查询RDD的名称 y.name res1: String = null //用户设置RDD的名称 y.setName("Fancy RDD Name") y.name res2: String = Fancy RDD Name
50、partitions
原型 final def partitions: Array[Partition]
含义
partitions 获得RDD的分区信息,并将其放入一个数组返回
示例
val a = sc.parallelize(List(5.0, 2.0, 2.1, 7.4, 7.5, 7.1, 8.8, 10.0, 8.9, 5.5), 3) a.partitions //返回三个分区信息,主要包括分区编号,分区类型 res1: Array( org.apache.spark.rdd.ParallelCollectionPartition@29f6, org.apache.spark.rdd.ParallelCollectionPartition@29f7, org.apache.spark.rdd.ParallelCollectionPartition@29f8)
51、persist, cache
原型 def cache(): RDD[T]
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]
含义
persist, cache 只是 persist(StorageLevel.MEMORY_ONLY) 的缩写,用于调整RDD的存储级别,一旦存储级别修改,就不能再次修改。
示例
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.getStorageLevel //表示该RDD还没有设置存储级别,只是存储一份 res0: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas) //cache 和persist功能一样,默认调整RDD的存储级别为 MEMORY c.cache c.getStorageLevel res2: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
52、pipe
原型 def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String]
含义
pipe 对RDD中的每一个分区执行shell命令操作,
示例
//对每一个分区执行shell命令,并将结果返回到新的RDD中 val a = sc.parallelize(1 to 9, 3) a.pipe("head -n 1").collect res1: Array[String] = Array(1, 4, 7)
53、randomSplit
原型 def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
含义
randomSplit 把RDD中数据,根据用户指定的权重,随机切分成多个小的RDD,小RDD中元素的个数由权重来确定。在一些特殊情况下,我们需要让随机产生的数据相同,就可以初始化相同的种子。
示例
val y = sc.parallelize(1 to 10) //按照6:4的权重来切分数据,指定的初始化种子为 seed = 11L val splits = y.randomSplit(Array(0.6, 0.4), seed = 11L) //返回的是一个小型的RDD集合 val training = splits(0) val test = splits(1) training.collect res:1 Array[Int] = Array(1, 4, 5, 6, 8, 10) test.collect res2: Array[Int] = Array(2, 3, 7, 9) //也可以不指定初始化种子,每次按照权重切分的结果不同,可能一个RDD中的数据个数也不同 val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.1, 0.3, 0.6)) splits(2).collect //此处结果是5个,就说明不是严格按照百分比,可能会出现上下波动 res2: Array[Int] = Array(2, 3, 4, 6, 8)
54、reduce
原型 def reduce(f: (T, T) => T): T
含义
reduce 把RDD中任意两个元素分层合并,根据用户指定的函数进行聚合
示例
val a = sc.parallelize(1 to 100, 3) //rdd中任意两个数据,将其不断缩小 a.reduce(_ + _) res1: Int = 5050
55、reduceByKey [Pair]
原型 def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
含义
reduceByKey 先根据相同的 key 进行分组,然后在相同的 key 中进行聚合,聚合形式和 reduce 相同
示例
//reduceByKey处理的是key-value类型的数据 val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) //先按照key分组,每个组内部进行reduce,然后生成新的RDD b.reduceByKey(_ + _).collect res1: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
56、rightOuterJoin [Pair]
原型 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
含义
rightOuterJoin 类似于数据中的右外连接,以右边的作为参考,要是左边没有,那么就将其补空。右边没有的,左边有,那么就舍弃。
示例
//设置两个key-value集合 val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c =sc.parallelize(List("dog","cat","gnu","salmon","turkey","wolf","bear"), 3) val d = c.keyBy(_.length) b.rightOuterJoin(d).collect //如果右边有相同的key,他们会按照多个key来计算 res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),turkey)), (3,(Some(dog),gnu)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (4,(None,wolf)), (4,(None,bear)))
相关文章推荐
- Spark RDD API 参考示例(一)
- Spark RDD API 参考示例(二)
- Spark RDD API 参考示例(三)
- Spark RDD API 参考示例(五)
- Spark RDD API 参考示例(五)
- Spark RDD API 参考示例(六)
- Spark RDD API 参考示例(二)
- Spark RDD API 参考示例(四)
- 示例 Demo 工程和 API 参考链接
- Spark:将RDD[List[String,List[Person]]]中的List[Person]通过spark api保存为hdfs文件时一直出现not serializable task,没办法找到"spark自定义Kryo序列化输入输出API"
- Spark RDD API详解
- spark-rdd-api
- Spark API 详解/大白话解释 之 RDD、partition、count、collect
- Spark RDD API详解(一) Map和Reduce
- spark2.1:rdd.combineByKeyWithClassTag的用法示例
- spark2.x由浅入深深到底系列六之RDD api reduceByKey与foldByKey对比
- Spark RDD API扩展开发(1)
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- SparkRDDAPI常用算子说明
- spark1.x-rdd api (大全)