您的位置:首页 > 移动开发

spark算子之map_mapPartitions_mapPartitionsWithIndex

2017-12-20 23:47 477 查看
 XML Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

package com.lyzx.day12

import org.apache.spark.{SparkContext, SparkConf}

class U{

  var name=""

 def setName(_name:String): Unit ={

   name = _name

 }

 override def toString(): String ={

    this.getClass.getName+"=="+name

  }

}

class T1 {

  /*

    scala中的yield用法

    yield的作用是返回一个集合 下面的代码意思是把1到10的集合遍历出来对于每个集合的每一项item做item*100的操作

    其中返回的类型是scala.collection.immutable.Vector

   */

  def f1(): Unit ={

   val list = for(i <- 1 to 10) yield i*100

    println(list.getClass.getName)

    list.foreach(println)

  }

  /*

    map 映射把类型为T的项转换为类型为U的项 当然T和U也可以一样,就比如下面

    A:map的参数是

      1>如果集合中放的是基本类型则该参数是值

      2>如果集合中放的是引用类型则该参数是对象的引用

    B:scala中使用val 修饰变量类似于java中使用final 修饰变量,也就是说val修饰的变量不能指向别的对象但是这个引用指向的对象时可以修改的,

    A、B 这两点和java保持一致

   */

  def f2(): Unit ={

    val u1 = new U

    u1.setName("one")

    val list = List[U](u1)

    list.map(item=>{item.setName("two");item}).foreach(println)

    println("===========================")

    list.foreach(println)

    println("+++++++++++++++++++++++++++++++++++++++++++++++=")

    val list2 = List(1,2,3,4,5)

    list2.map(item => item*10).foreach(println)

    println("============================")

    list2.foreach(println)

  }

  /*

   mapPartitions 算子的作用

   首先map是把一个RDD中的数据的每一项准换为其他的类型及 map(T=>U)

   而mapPartitions会把一个RDD中的每一个Partition中的数据以一个接待器的形式返回来,这样做的好处是调用该方法的次数会减少

   也是scala对map算子的一个优化

   下面的~~~~打印了3次说明该方法调用了3次因为分区就是3个

   */

  def f3(sc:SparkContext): Unit ={

    //3个分区

    val rdd = sc.makeRDD(1 to 10 ,3)

    rdd.mapPartitions(itr=>{

      println("~~~~~~~~~~~~~~~~~~~~~~~")

     for(item <- itr) yield item*1000

    }).foreach(println)

  }

  /*

    mapPartitionsWithIndex相比于mapPartitions多了一个index索引,每次调用时就会把分区的“编号”穿进去

   */

  def f4(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(1 to 10 ,3)

    rdd.mapPartitionsWithIndex((index,itr)=>{

      println(index)

      for(item <- itr) yield item*100

    }).foreach(println)

  }

}

object T1{

  def main(args: Array[String]) {

    val conf = new SparkConf()

    conf.setAppName("test").setMaster("local")

    val sc = new SparkContext(conf)

    val t = new T1
//    t.f1()
//    t.f2()
//    t.f3(sc)

    t.f4(sc)

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: