您的位置:首页 > 其它

Spark算子详解

2018-01-07 22:40 204 查看
 XML Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

package com.lyzx.day17

import org.apache.spark.{SparkContext,SparkConf}

class T1 {

  /*

    cartesian  做笛卡尔积

    RDD[K]+RDD[V]=>RDD[(K,V)]

    把每项依次组合后放入一个元组中返回

   */

  def f1(sc:SparkContext): Unit ={

    val arr1 = List("A","B","C")

    val arr2 = List(1,2,3)

    val rdd1 = sc.parallelize(arr1)

    val rdd2 = sc.parallelize(arr2)

    rdd1.persist()

    rdd2.persist()

    rdd1.cartesian(rdd2).foreach(println)

    println("===============================")

    val rdd11 = rdd1.map(item=>(item,item))

    val rdd21 = rdd2.map(item=>(item,item))

    rdd11.cartesian(rdd21).foreach(println)

  }

  /*

  countByKey  是一个action操作  计算每一个key出现的次数

  具有shuffle操作

  计算出每一个键对应的个数,底层使用reduceByKey

   */

  def f2(sc:SparkContext): Unit ={

    val arr1 = (1 to 10)

    val rdd1 = sc.parallelize(arr1)

    val arr2 = (5 to 15)

    val rdd2 = sc.parallelize(arr2)

    rdd1.map(item=>(item,item*100))

        .union(rdd2.map(item=>(item,item+99)))

        .countByKey()

        .foreach(println)

  }

  /*

  join

    类似于SQL里面的inner join左表根据条件匹配右表的记录,如果左表没有匹配到右表的记录则不会加回去

    返回元组,类似于(key,(value1,value2,...,values3))

   cogroup

    首先在左表的记录(k,v)封装为(k,compactBuffer(v))的形式 再做join操作把匹配的项(k,v1),(k,v2)封装为(k,compactBuffer(v1,v2))

    右表做相同的操作,然后左表右表做join操作

   */

  def f3(sc:SparkContext): Unit ={

    val arr1 = List((1,"罗永浩"),(2,"雷军"),(3,"余承东"),(1,"玉皇大帝"),(4,"战神刑天"))

    val studentRdd = sc.parallelize(arr1)

    studentRdd.persist()

    val arr2 = List((1,88),(2,99),(3,99))

    val scoreRdd = sc.parallelize(arr2)

    scoreRdd.persist()

    //studentRdd中的记录根据key匹配scoreRdd中的记录,然后把匹配的记录放入到一个元组中(key,(leftValue,rightValue))

    studentRdd.join(scoreRdd).foreach(println)

    println("= = = = = = = = = = = = = = = = = = = = = ")

    //cogroup 首先在studentRdd中把键归并在一起把值存入CompactBuffer  在scoreRdd中相同

    //然后把两个计算的结果在做join

    studentRdd.cogroup(scoreRdd).foreach(println)

  }

  /*

    mapValues 只针对键值对的RDD,其中键不变,后面的函数只操作值

   */

  def f4(sc:SparkContext): Unit ={

    val rdd = sc.parallelize((1 to 10))

    val mapRdd = rdd.map(item=>(item,item+100))

    mapRdd.mapValues(_+1).foreach(println)

  }

  /*

    有序列化的地方

    1、存储等级  MEMERY_ONLY > MEMERY_SER

    2、shuffle 走网络传输的时候

    3、分发任务和返回结果的时候

    读取本地文件

    textFile()的第二个参数是最少的partition的个数

    如果不传就找集群中的默认的并行度和2的最小值最为默认的partition的个数

    如果读取hdfs上的文件默认有多少个block就有多少个partition

    如果出入的参数小于block的个数则partition的个数还是block个

    如果大于block的个数 则就使用参数个

   */

  /*

    sortBy

    第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;

      >>主要用于对每个元素进行计算转换之类的操作

      >>如果不需要做计算就写x=>x  从x到x的映射

      >>后面两个参数都有默认值,如果想降序排列可以一个参数搞定x=>{-x}

  第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;

      >>是否是升序

  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,

    即为this.partitions.size

   */

  def f5(sc:SparkContext): Unit ={

    val rdd = sc.parallelize(List(10,9,8,7,6,5,4,3,2,1))

    rdd.sortBy(x=>x,true).foreach(print)

    println("========================================")

    val rdd2 = sc.parallelize(1 to 10)

    rdd2.sortBy(x=>{-x}).foreach(print)

  }

  /*

    countByValue

    数一数值有多少个,这儿的value并不是键值对里面的value

    而是RDD的里面的元素,返回结果是一个元组(element,count) 第一项是元素,第二项是个数

   */

  def f6(sc:SparkContext): Unit ={

    val rdd = sc.parallelize(List((1,10),(2,10),(3,3),(4,4),(5,10),(5,10),(6,1),(7,1)))
//    val mapRdd = rdd.map(item=>(item,item))

    val xx = rdd.countByValue()

    xx.foreach(println)

    println("==================================")

    val rdd2 = sc.parallelize(List(1,2,2,2,3,4))

    val result = rdd2.countByValue()

    result.foreach(item=>println(item._1+"-"+item._2))

  }

  /*

     groupBy  分组组成键值对的形式

     键是每元素  值是元素组成的CompactBuffer集合

   */

  def f7(sc:SparkContext): Unit ={

    val rdd = sc.parallelize(List(1,2,2,2,3,4,5,6))

    rdd.groupBy(x=>x).foreach(println)

  }

  /*

  一个简单的TopN 小栗子

  如下data列表中每一个元素都是网名在搜索引擎中搜索的关键字,现在要统计今天的热搜词(出现次数最多的次)Top2

   */

  def f8(sc:SparkContext): Unit ={

    val data = List("A","B","B","A","C","D","A","A","A","A","A","A","A","A","A","A","A","C","C","C","C","C")

    val rdd = sc.parallelize(data)

    //org.apache.spark.rdd.ShuffledRDD

    val mapRdd = rdd.countByValue()

                    .map(item=>(item._2,item._1))

    println(mapRdd.getClass.getName)

    mapRdd.foreach(item=>println(item._1+"----"+item._2))

//    rdd.countByValue()
//      .map(item=>(item._2,item._1))
//      .toSeq
//      .sortBy(x=>{-x._1})
//      .take(2)
//      .foreach(item=>println(item._2))

  }

  def f9(sc:SparkContext): Unit ={
//    val rdd = sc.parallelize((1 to 10))
//    val mapRdd = rdd.map(item=>(item,item))
//    mapRdd
//    .sortByKey()
//    println(mapRdd.getClass.getName)

    val data1 = ("A","B","C")

    val data2 = (1 to 10)

    println(data1.getClass.getName)

    println(data2.getClass.getName)

  }

}

object T1{

  def main(args: Array[String]) {

      val conf = new SparkConf().setAppName("day17").setMaster("local")

      val sc = new SparkContext(conf)

      val t = new T1
//    t.f1(sc)
//    t.f2(sc)
//    t.f3(sc)
//    t.f4(sc)
//    t.f5(sc)
//    t.f6(sc)
//    t.f7(sc)
//    t.f8(sc)

    t.f9(sc)

      sc.stop()

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: