Spark运算:统计RDD分区中的元素及数量
2016-12-19 00:00
225 查看
Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Block数。
RDD partition size:3
RDD1 partition size:12
16/12/19 15:47:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 229.9 KB, free 900.4 MB)
16/12/19 15:47:56 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.1 KB, free 900.4 MB)
16/12/19 15:47:56 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:61774 (size: 22.1 KB, free: 900.6 MB)
16/12/19 15:47:56 INFO SparkContext: Created broadcast 0 from textFile at ShellTest.scala:31
16/12/19 15:47:57 INFO FileInputFormat: Total input paths to process : 1
RDD partition size:76
16/12/20 11:24:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/12/20 11:24:46 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:22, took 1.109165 s
94
94
94
94
94
94
94
94
94
94
94
94
16/12/20 11:24:46 INFO SparkContext: Invoking stop() from shutdown hook
def main(args: Array[String]) { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) //创建一个RDD,分区3个 val rdd = sc.makeRDD(1 to 128, 3) println("RDD partition size:" + rdd.partitions.size) //默认分区12个 val rdd1 = sc.makeRDD(1 to 128) println("RDD1 partition size:" + rdd1.partitions.size) //从HDFS文件创建的RDD,包含76个分区,因为该文件由76个Block val hdfs = sc.textFile("C:\\Windows\\Logs\\CBS\\CbsPersist_20151208074613.log") println("RDD partition size:" + hdfs.partitions.size) }
RDD partition size:3
RDD1 partition size:12
16/12/19 15:47:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 229.9 KB, free 900.4 MB)
16/12/19 15:47:56 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.1 KB, free 900.4 MB)
16/12/19 15:47:56 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:61774 (size: 22.1 KB, free: 900.6 MB)
16/12/19 15:47:56 INFO SparkContext: Created broadcast 0 from textFile at ShellTest.scala:31
16/12/19 15:47:57 INFO FileInputFormat: Total input paths to process : 1
RDD partition size:76
统计分区元素数量
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 1128) rdd1.mapPartitions{x => Iterator(x.length)}.collect.foreach(println(_)) }
16/12/20 11:24:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/12/20 11:24:46 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:22, took 1.109165 s
94
94
94
94
94
94
94
94
94
94
94
94
16/12/20 11:24:46 INFO SparkContext: Invoking stop() from shutdown hook
相关文章推荐
- Spark算子:统计RDD分区中的元素及数量
- Spark算子:统计RDD分区中的元素及数量
- Spark算子:统计RDD分区中的元素及数量
- 统计RDD分区中的元素及数量
- Spark RDD 默认分区数量 - repartitions和coalesce异同
- 影响Spark输出RDD分区的操作函数
- 一起学spark(6)-- 仅在数值RDD上的统计操作
- spark1.x-rdd运算原理
- spark count统计元素个数
- 一起学spark(6)-- 仅在数值RDD上的统计操作
- 影响到Spark输出RDD分区的操作函数
- Spark RDD系列-------1. 决定Spark RDD分区算法因素的总结
- spark RDD分区2GB限制(Size exceeds Integer.MAX_VALUE)
- 在Spark集群中,集群的节点个数、RDD分区个数、cpu内核个数三者与并行度的关系??
- 在Spark集群中,集群的节点个数、RDD分区个数、cpu内核个数三者与并行度的关系??
- 一起学spark(6)-- 仅在数值RDD上的统计操作
- Spark开发-RDD分区重新划分
- 一起学spark(6)-- 仅在数值RDD上的统计操作
- Spark PairRDD 行动与数据分区