spark的转换算子操作
2017-10-15 17:21
190 查看
package os.Streaming import org.apache.spark.rdd.RDD import org.apache.spark.{Partition, SparkConf, SparkContext} import org.junit import org.junit.{Before, Test} import scala.collection.mutable class StreamingDemo { var sc: SparkContext = _ @Before def init(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("test") sc = new SparkContext(conf) } /** * randomSplit 根据weight权重将一根RDD分割为多个RDD * @Return 4 */ @Test def test2(): Unit ={ val rdd = sc.makeRDD(1 to 10,10) val splitRDD = rdd.randomSplit(Array(1.0,2.0,3.0,4.0)) val size = splitRDD.size println(size) } /** * union 合并 * intersection 取并集 * @Out 1223 * @Out 2 */ @Test def test3(): Unit ={ val rdd1: RDD[Int] = sc.makeRDD(1 to 2,1) val rdd2: RDD[Int] = sc.makeRDD(2 to 3,1) val ints: Array[Int] = rdd1.union(rdd2).collect() println(ints.mkString) val rdd3: RDD[Int] = rdd1.intersection(rdd2) println(rdd3.collect.mkString) } /** * mapParations:类似map操作,只不过将映射参数由RDD中的每一个元素变成了RDD中的每一个分区 */ @Test def test4(): Unit ={ val rdd1: RDD[Int] = sc.makeRDD(1 to 5,2) println(rdd1.collect.toList) //List(1, 2, 3, 4, 5) val rdd2: RDD[Int] = rdd1.mapPartitions(x => { var result = List[Int]() var i = 0 println(x.toList) //List(1, 2) 和 List(3, 4, 5) while (x.hasNext) { i += x.next() } result.::(i).iterator }) println(rdd2.collect.toList) //List(3, 12) } /** * mapPartitionsWithIndex:和mapPartitions类似,只是多了一根分区索引 * @Out : List(0 | 3, 1 | 12) */ @Test def test5(): Unit ={ val rdd1: RDD[Int] = sc.makeRDD(1 to 5,2) val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex { (x, iter) => { var result = List[String]() var i = 0 while (iter.hasNext) { i += iter.next() } result.::(x + " | " + i).iterator } } println(rdd2.collect.toList) //List(0 | 3, 1 | 12) } /** * zip: 操作将两个RDD组合成Key/Value 形式的RDD,这里默认的两个rdd的partition的数量和元素个数一样,否则抛出异常 * @Out: List((1,A), (2,B), (3,C), (4,D), (5,E)) */ @Test def test6(): Unit ={ val rdd1: RDD[Int] = sc.makeRDD(1 to 5,2) val rdd2: RDD[String] = sc.makeRDD(Seq("A","B","C","D","E"),2) println(rdd1.zip(rdd2).collect().toList) } // 键值转换 /** * 查看每个分区的元素(KEY,VALUE) * @Out: List((part_0,List((2,B), (1,A))), (part_1,List((5,E), (4,D), (3,C)))) */ @Test def test7(): Unit ={ val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"A"), (2,"B"), (3,"C"), (4,"D"), (5,"E")),2) //查看 rdd1每个分区的元素 val list: List[(String, List[(Int, String)])] = rdd1.mapPartitionsWithIndex { (partId, iter) => { val part_map: mutable.Map[String, List[(Int, String)]] = scala.collection.mutable.Map[String, List[(Int, String)]]() while (iter.hasNext) { var part_name = "part_" + partId var elem = iter.next() if (part_map.contains(part_name)) { var elems = part_map(part_name) elems ::= elem part_map(part_name) = elems } else { part_map(part_name) = List[(Int, String)] { elem } } } part_map.iterator } }.collect().toList println(list) } /** * mapValues: 类似map,只不过mapValues是指定[K,V]格式的,对V进行map操作 * @OUT: List((1,A_), (2,B_), (3,C_), (4,D_), (5,E_)) */ @Test def test8(): Unit ={ val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"A"), (2,"B"), (3,"C"), (4,"D"), (5,"E")),2) val list: List[(Int, String)] = rdd1.mapValues(x => x + "_").collect().toList println(list) } /** * partitionBy: 根据partitioner函数生成新的ShuffleRDD */ @Test def test9(): Unit ={ val rdd1: RDD[(Int, String)] = sc.makeRDD(Array((1,"A"), (2,"B"), (3,"C"), (4,"D"), (5,"E")),2) val rdd2: RDD[(Int, String)] = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) } /** * combineByKey: 操作用于将RDD[K,V] 转换成RDD[K,C] * combineByKey[C]( * createCombiner: V => C, 组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C * mergeValue: (C, V) => C, 合并函数,将一根C类型和一个V类型值合并成一根C类型,输入参数为(C,V),输出为C * mergeCombiners: (C, C) => C) 合并组合函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C * @OUT: List((B,1_@2), (A,1_@2), (C,1_)) * * * 1_@2 1_@2 * List((B,1_@2), (A,1_@2), (C,1_)) */ @Test def test10(): Unit ={ val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) val rdd2: RDD[(String, String)] = rdd1.combineByKey( (v: Int) => { val str: String = v + "_" println(str) str }, (c: String, v: Int) => { val str: String = c + "@" + v println(str) str }, (c1: String, c2: String) => { val str: String = c1 + "$" + c2 println(str) str } ) val list: List[(String, String)] = rdd2.collect().toList println(list) } @Test def test11(): Unit ={ val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2),("B",2),("C",1))) val list: List[(String, Int)] = rdd1.foldByKey(100)(_+_).collect().toList println(list) val rdd2: RDD[(String, Int)] = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) println(rdd2.foldByKey(0)(_+_).collect().toList) } /** * groupBykey: 根据相同key 分为一组 * @OUT :List((B,CompactBuffer(2, 2)), (A,CompactBuffer(1, 2)), (C,CompactBuffer(1))) */ @Test def test12(): Unit ={ val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2),("B",2),("C",1))) val list: List[(String, Iterable[Int])] = rdd1.groupByKey().collect().toList println(list) } /** * reduceByKey:根据相同的key计算 * @OUT: List((B,4), (A,3), (C,1)) */ @Test def test13(): Unit ={ val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2),("B",2),("C",1))) val list: List[(String, Int)] = rdd1.reduceByKey((x, y)=>x+y).collect().toList println(list) } /** * cogroup:全外关联,关联不上的为空 * @OUT: List((B,(CompactBuffer(2),CompactBuffer(2))), (A,(CompactBuffer(1, 2),CompactBuffer(a, 2))), (C,(CompactBuffer(),CompactBuffer(2)))) */ @Test def test14(){ val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("A",1),("A",2),("B",2))) val rdd2: RDD[(String, String)] = sc.makeRDD(Array(("A","a"),("A","2"),("B","2"),("C","2"))) val unit: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd1.cogroup(rdd2) println(unit.collect().toList) } }
相关文章推荐
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子:RDD基本转换操作(6)–zip、zipPartitions
- Spark算子:RDD基本转换操作(mapPartitions、mapPartitionsWithIndex)
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD基本转换操作(5)–mapPartitions、
- Spark算子:RDD基本转换操作(4)–union、intersection、subtract
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD基本转换操作(3)–randomSplit、glom
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD基本转换操作(6)–zip、zipPartitions
- Spark算子:RDD基本转换操作(3)–randomSplit、glom
- “戏”说Spark-Spark核心-RDD转换操作算子详解(二)