spark算子之map_mapPartitions_mapPartitionsWithIndex
2017-12-20 23:47
477 查看
XML Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | package com.lyzx.day12 import org.apache.spark.{SparkContext, SparkConf} class U{ var name="" def setName(_name:String): Unit ={ name = _name } override def toString(): String ={ this.getClass.getName+"=="+name } } class T1 { /* scala中的yield用法 yield的作用是返回一个集合 下面的代码意思是把1到10的集合遍历出来对于每个集合的每一项item做item*100的操作 其中返回的类型是scala.collection.immutable.Vector */ def f1(): Unit ={ val list = for(i <- 1 to 10) yield i*100 println(list.getClass.getName) list.foreach(println) } /* map 映射把类型为T的项转换为类型为U的项 当然T和U也可以一样,就比如下面 A:map的参数是 1>如果集合中放的是基本类型则该参数是值 2>如果集合中放的是引用类型则该参数是对象的引用 B:scala中使用val 修饰变量类似于java中使用final 修饰变量,也就是说val修饰的变量不能指向别的对象但是这个引用指向的对象时可以修改的, A、B 这两点和java保持一致 */ def f2(): Unit ={ val u1 = new U u1.setName("one") val list = List[U](u1) list.map(item=>{item.setName("two");item}).foreach(println) println("===========================") list.foreach(println) println("+++++++++++++++++++++++++++++++++++++++++++++++=") val list2 = List(1,2,3,4,5) list2.map(item => item*10).foreach(println) println("============================") list2.foreach(println) } /* mapPartitions 算子的作用 首先map是把一个RDD中的数据的每一项准换为其他的类型及 map(T=>U) 而mapPartitions会把一个RDD中的每一个Partition中的数据以一个接待器的形式返回来,这样做的好处是调用该方法的次数会减少 也是scala对map算子的一个优化 下面的~~~~打印了3次说明该方法调用了3次因为分区就是3个 */ def f3(sc:SparkContext): Unit ={ //3个分区 val rdd = sc.makeRDD(1 to 10 ,3) rdd.mapPartitions(itr=>{ println("~~~~~~~~~~~~~~~~~~~~~~~") for(item <- itr) yield item*1000 }).foreach(println) } /* mapPartitionsWithIndex相比于mapPartitions多了一个index索引,每次调用时就会把分区的“编号”穿进去 */ def f4(sc:SparkContext): Unit ={ val rdd = sc.makeRDD(1 to 10 ,3) rdd.mapPartitionsWithIndex((index,itr)=>{ println(index) for(item <- itr) yield item*100 }).foreach(println) } } object T1{ def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("test").setMaster("local") val sc = new SparkContext(conf) val t = new T1 // t.f1() // t.f2() // t.f3(sc) t.f4(sc) } } |
相关文章推荐
- Spark编程之基本的RDD算子之map,mapPartitions, mapPartitionsWithIndex.
- Spark算子--map和flatMap
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子--mapPartitions和mapPartitionsWithIndex
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark Rdd map和mapPartitions效率问题
- Spark算子之mapPartitions
- Spark---算子调优之MapPartitions提升Map类操作性能
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子--flatMapValues
- spark--transform算子--map
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD基本转换操作map、flatMap
- spark map flatMap flatMapToPair mapPartitions 的区别和用途
- spark:map mapPartitions flatmap
- spark--transform算子--mapPartitions
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子[03]:mapPartitions,mapPartitionsWithIndex 源码实战案例分析
- Spark算子[04]:map,flatMap,mapToPair,flatMapToPair