您的位置:首页 > 其它

Spark运算:统计RDD分区中的元素及数量

2016-12-19 00:00 225 查看
Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Block数。

def main(args: Array[String]) {
//默认分区12个
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))

//创建一个RDD,分区3个
val rdd = sc.makeRDD(1 to 128, 3)
println("RDD partition size:" + rdd.partitions.size)

//默认分区12个
val rdd1 = sc.makeRDD(1 to 128)
println("RDD1 partition size:" + rdd1.partitions.size)

//从HDFS文件创建的RDD,包含76个分区,因为该文件由76个Block
val hdfs = sc.textFile("C:\\Windows\\Logs\\CBS\\CbsPersist_20151208074613.log")
println("RDD partition size:" + hdfs.partitions.size)
}


RDD partition size:3
RDD1 partition size:12
16/12/19 15:47:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 229.9 KB, free 900.4 MB)
16/12/19 15:47:56 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.1 KB, free 900.4 MB)
16/12/19 15:47:56 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:61774 (size: 22.1 KB, free: 900.6 MB)
16/12/19 15:47:56 INFO SparkContext: Created broadcast 0 from textFile at ShellTest.scala:31
16/12/19 15:47:57 INFO FileInputFormat: Total input paths to process : 1
RDD partition size:76

统计分区元素数量

def main(args: Array[String]): Unit = {
//默认分区12个
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
var rdd1 = sc.makeRDD(1 to 1128)
rdd1.mapPartitions{x => Iterator(x.length)}.collect.foreach(println(_))
}

16/12/20 11:24:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/12/20 11:24:46 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:22, took 1.109165 s
94
94
94
94
94
94
94
94
94
94
94
94
16/12/20 11:24:46 INFO SparkContext: Invoking stop() from shutdown hook
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark Spark运算