您的位置:首页 > 其它

spark的转换算子操作

2017-10-15 17:21 190 查看
package os.Streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.junit
import org.junit.{Before, Test}

import scala.collection.mutable

class StreamingDemo {

var sc: SparkContext = _

@Before
def init(): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("test")

sc = new SparkContext(conf)
}

/**
* randomSplit 根据weight权重将一根RDD分割为多个RDD
* @Return 4
*/
@Test
def test2(): Unit ={

val rdd = sc.makeRDD(1 to 10,10)

val splitRDD = rdd.randomSplit(Array(1.0,2.0,3.0,4.0))

val size = splitRDD.size
println(size)
}

/**
* union 合并
* intersection 取并集
* @Out 1223
* @Out 2
*/
@Test
def test3(): Unit ={

val rdd1: RDD[Int] = sc.makeRDD(1 to 2,1)
val rdd2: RDD[Int] = sc.makeRDD(2 to 3,1)

val ints: Array[Int] = rdd1.union(rdd2).collect()
println(ints.mkString)

val rdd3: RDD[Int] = rdd1.intersection(rdd2)

println(rdd3.collect.mkString)
}

/**
* mapParations:类似map操作,只不过将映射参数由RDD中的每一个元素变成了RDD中的每一个分区
*/
@Test
def test4(): Unit ={

val rdd1: RDD[Int] = sc.makeRDD(1 to 5,2)

println(rdd1.collect.toList) //List(1, 2, 3, 4, 5)

val rdd2: RDD[Int] = rdd1.mapPartitions(x => {
var result = List[Int]()
var i = 0

println(x.toList) //List(1, 2) 和 List(3, 4, 5)

while (x.hasNext) {
i += x.next()
}
result.::(i).iterator
})
println(rdd2.collect.toList) //List(3, 12)

}

/**
* mapPartitionsWithIndex:和mapPartitions类似,只是多了一根分区索引
* @Out : List(0 | 3, 1 | 12)
*/
@Test
def test5(): Unit ={
val rdd1: RDD[Int] = sc.makeRDD(1 to 5,2)

val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex {
(x, iter) => {

var result = List[String]()
var i = 0
while (iter.hasNext) {
i += iter.next()
}
result.::(x + " | " + i).iterator
}
}
println(rdd2.collect.toList)  //List(0 | 3, 1 | 12)
}

/**
* zip: 操作将两个RDD组合成Key/Value 形式的RDD,这里默认的两个rdd的partition的数量和元素个数一样,否则抛出异常
* @Out: List((1,A), (2,B), (3,C), (4,D), (5,E))
*/
@Test
def test6(): Unit ={
val rdd1: RDD[Int] = sc.makeRDD(1 to 5,2)
val rdd2: RDD[String] = sc.makeRDD(Seq("A","B","C","D","E"),2)

println(rdd1.zip(rdd2).collect().toList)
}

// 键值转换

/**
* 查看每个分区的元素(KEY,VALUE)
* @Out: List((part_0,List((2,B), (1,A))), (part_1,List((5,E), (4,D), (3,C))))
*/
@Test
def test7(): Unit ={

val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"A"), (2,"B"), (3,"C"), (4,"D"), (5,"E")),2)
//查看 rdd1每个分区的元素
val list: List[(String, List[(Int, String)])] = rdd1.mapPartitionsWithIndex {
(partId, iter) => {
val part_map: mutable.Map[String, List[(Int, String)]] = scala.collection.mutable.Map[String, List[(Int, String)]]()

while (iter.hasNext) {
var part_name = "part_" + partId
var elem = iter.next()
if (part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[(Int, String)] {
elem
}
}

}
part_map.iterator
}

}.collect().toList

println(list)
}

/**
* mapValues: 类似map,只不过mapValues是指定[K,V]格式的,对V进行map操作
* @OUT: List((1,A_), (2,B_), (3,C_), (4,D_), (5,E_))
*/
@Test
def test8(): Unit ={
val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"A"), (2,"B"), (3,"C"), (4,"D"), (5,"E")),2)

val list: List[(Int, String)] = rdd1.mapValues(x => x + "_").collect().toList
println(list)
}

/**
* partitionBy: 根据partitioner函数生成新的ShuffleRDD
*/
@Test
def test9(): Unit ={
val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"A"), (2,"B"), (3,"C"), (4,"D"), (5,"E")),2)
val rdd2: RDD[(Int, String)] = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
}

/**
* combineByKey: 操作用于将RDD[K,V] 转换成RDD[K,C]
* combineByKey[C](
*  createCombiner: V => C,        组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
*  mergeValue: (C, V) => C,       合并函数,将一根C类型和一个V类型值合并成一根C类型,输入参数为(C,V),输出为C
* mergeCombiners: (C, C) => C)    合并组合函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
* @OUT: List((B,1_@2), (A,1_@2), (C,1_))
*
*
*      1_@2  1_@2
*      List((B,1_@2), (A,1_@2), (C,1_))
*/
@Test
def test10(): Unit ={

val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

val rdd2: RDD[(String, String)] = rdd1.combineByKey(
(v: Int) => {

val str: String = v + "_"

println(str)
str
},
(c: String, v: Int) => {
val str: String = c + "@" + v
println(str)
str
},
(c1: String, c2: String) => {

val str: String = c1 + "$" + c2
println(str)
str

}
)
val list: List[(String, String)] = rdd2.collect().toList
println(list)
}

@Test
def test11(): Unit ={
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2),("B",2),("C",1)))

val list: List[(String, Int)] = rdd1.foldByKey(100)(_+_).collect().toList
println(list)

val rdd2: RDD[(String, Int)] = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
println(rdd2.foldByKey(0)(_+_).collect().toList)

}

/**
* groupBykey: 根据相同key 分为一组
* @OUT :List((B,CompactBuffer(2, 2)), (A,CompactBuffer(1, 2)), (C,CompactBuffer(1)))
*/
@Test
def test12(): Unit ={
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2),("B",2),("C",1)))

val list: List[(String, Iterable[Int])] = rdd1.groupByKey().collect().toList
println(list)
}

/**
* reduceByKey:根据相同的key计算
* @OUT: List((B,4), (A,3), (C,1))
*/
@Test
def test13(): Unit ={
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2),("B",2),("C",1)))
val list: List[(String, Int)] = rdd1.reduceByKey((x, y)=>x+y).collect().toList
println(list)
}

/**
* cogroup:全外关联,关联不上的为空
* @OUT: List((B,(CompactBuffer(2),CompactBuffer(2))), (A,(CompactBuffer(1, 2),CompactBuffer(a, 2))), (C,(CompactBuffer(),CompactBuffer(2))))
*/
@Test
def test14(){
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2)))
val rdd2: RDD[(String, String)] = sc.makeRDD(Array(("A","a"),("A","2"),("B","2"),("C","2")))

val unit: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd1.cogroup(rdd2)
println(unit.collect().toList)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: