Spark RDD Action 详解---Spark学习笔记8
2014-04-04 17:46
721 查看
话接上回Spark RDD Transformation。
这次学习的是Spark RDD 的Action 部分函数。
这次拿根目录下的kmeans_data.txt当数据源来实践。
其实这里可以清楚的看到,每一行是一个数组的元素。
数据集大了就不要这么玩了,一般是用在较小的书数据集上,大和小是相对于你的机器的。
count
这个地球人都知道。。。返回的是dataset中的element的个数。
指定下标来获取元素
an array with a random sample of num elements
of the dataset, with or without replacement, using the given random number generator seed.
简单来说就是给它一个随机种子seed,取num个数据,是否替换。
在根目录下生存了一个文件夹shengli.result,里面有这次任务的结果
查看结果文件,的确是sequenceFile
Only
available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key.
原创,转载请注明出处http://blog.csdn.net/oopsoom/article/details/22948207,谢谢。
这次学习的是Spark RDD 的Action 部分函数。
这次拿根目录下的kmeans_data.txt当数据源来实践。
reduce
和transformation里面的reduceByKey差不多,但是这里没有根据key分组,只是单纯的2个参数。val kemeans_data = sc.textFile("kmeans_data.txt") kemeans_data.flatMap(_.split(" ")).map(_.toDouble).reduce(_+_) res67: Double = 82.80000000000001
collect
将当前的数据集扁平化为一个数组,<pre name="code" class="java">scala> kemeans_data.collect() 14/04/04 17:17:38 INFO spark.SparkContext: Starting job: collect at <console>:15 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Got job 35 (collect at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Final stage: Stage 46 (collect at <console>:15) 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Submitting Stage 46 (MappedRDD[93] at textFile at <console>:12), which has no missing parents 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 46 (MappedRDD[93] at textFile at <console>:12) 14/04/04 17:17:38 INFO local.LocalTaskSetManager: Size of task 40 is 1656 bytes 14/04/04 17:17:38 INFO executor.Executor: Running task ID 40 14/04/04 17:17:38 INFO storage.BlockManager: Found block broadcast_2 locally 14/04/04 17:17:38 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72 14/04/04 17:17:38 INFO executor.Executor: Serialized size of result for 40 is 519 14/04/04 17:17:38 INFO executor.Executor: Sending result for 40 directly to driver 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Completed ResultTask(46, 0) 14/04/04 17:17:38 INFO local.LocalScheduler: Remove TaskSet 46.0 from pool 14/04/04 17:17:38 INFO executor.Executor: Finished task ID 40 14/04/04 17:17:38 INFO scheduler.DAGScheduler: Stage 46 (collect at <console>:15) finished in 0.005 s 14/04/04 17:17:38 INFO spark.SparkContext: Job finished: collect at <console>:15, took 0.008528283 s res69: Array[String] = Array(0.0 0.0 0.0, 0.1 0.1 0.1, 0.2 0.2 0.2, 9.0 9.0 9.0, 9.1 9.1 9.1, 9.2 9.2 9.2)
其实这里可以清楚的看到,每一行是一个数组的元素。
数据集大了就不要这么玩了,一般是用在较小的书数据集上,大和小是相对于你的机器的。
count
这个地球人都知道。。。返回的是dataset中的element的个数。kemeans_data.count() res70: Long = 6
take
顾名思义,取多少个元素scala> splitedLine.take(1) 14/04/04 11:14:05 INFO spark.SparkContext: Starting job: take at <console>:19 14/04/04 11:14:05 INFO scheduler.DAGScheduler: Got job 6 (take at <console>:19) with 1 output partitions (allowLocal=true) 14/04/04 11:14:05 INFO scheduler.DAGScheduler: Final stage: Stage 6 (take at <console>:19) 14/04/04 11:14:05 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 11:14:05 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 11:14:05 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/04/04 11:14:05 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951 14/04/04 11:14:05 INFO spark.SparkContext: Job finished: take at <console>:19, took 0.004364898 s res12: Array[Array[java.lang.String]] = Array(Array("", "", Merge, pull, request, #248, from, colorant/branch-0.8))
指定下标来获取元素
scala> splitedLine.take(1)(0) 14/04/04 11:14:10 INFO spark.SparkContext: Starting job: take at <console>:19 14/04/04 11:14:10 INFO scheduler.DAGScheduler: Got job 7 (take at <console>:19) with 1 output partitions (allowLocal=true) 14/04/04 11:14:10 INFO scheduler.DAGScheduler: Final stage: Stage 7 (take at <console>:19) 14/04/04 11:14:10 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 11:14:10 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 11:14:10 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/04/04 11:14:10 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951 14/04/04 11:14:10 INFO spark.SparkContext: Job finished: take at <console>:19, took 0.004808966 s res13: Array[java.lang.String] = Array("", "", Merge, pull, request, #248, from, colorant/branch-0.8) scala> splitedLine.take(1)(0)(0) 14/04/04 11:14:15 INFO spark.SparkContext: Starting job: take at <console>:19 14/04/04 11:14:15 INFO scheduler.DAGScheduler: Got job 8 (take at <console>:19) with 1 output partitions (allowLocal=true) 14/04/04 11:14:15 INFO scheduler.DAGScheduler: Final stage: Stage 8 (take at <console>:19) 14/04/04 11:14:15 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 11:14:15 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 11:14:15 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/04/04 11:14:15 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951 14/04/04 11:14:15 INFO spark.SparkContext: Job finished: take at <console>:19, took 0.005684334 s res14: java.lang.String = ""
first
获取数据集的第一个元素。kemeans_data.first() res0: String = 0.0 0.0 0.0
takeSample
deftakeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T]Returnan array with a random sample of num elements
of the dataset, with or without replacement, using the given random number generator seed.
简单来说就是给它一个随机种子seed,取num个数据,是否替换。
scala> kemeans_data.takeSample(false,2,3) 14/04/04 17:28:49 INFO spark.SparkContext: Starting job: takeSample at <console>:15 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Got job 14 (takeSample at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Final stage: Stage 14 (takeSample at <console>:15) 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting Stage 14 (MappedRDD[1] at textFile at <console>:12), which has no missing parents 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 14 (MappedRDD[1] at textFile at <console>:12) 14/04/04 17:28:49 INFO local.LocalTaskSetManager: Size of task 13 is 1645 bytes 14/04/04 17:28:49 INFO executor.Executor: Running task ID 13 14/04/04 17:28:49 INFO storage.BlockManager: Found block broadcast_0 locally 14/04/04 17:28:49 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72 14/04/04 17:28:49 INFO executor.Executor: Serialized size of result for 13 is 473 14/04/04 17:28:49 INFO executor.Executor: Sending result for 13 directly to driver 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Completed ResultTask(14, 0) 14/04/04 17:28:49 INFO local.LocalScheduler: Remove TaskSet 14.0 from pool 14/04/04 17:28:49 INFO executor.Executor: Finished task ID 13 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Stage 14 (takeSample at <console>:15) finished in 0.008 s 14/04/04 17:28:49 INFO spark.SparkContext: Job finished: takeSample at <console>:15, took 0.012453567 s 14/04/04 17:28:49 INFO spark.SparkContext: Starting job: takeSample at <console>:15 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Got job 15 (takeSample at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Final stage: Stage 15 (takeSample at <console>:15) 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting Stage 15 (SampledRDD[7] at takeSample at <console>:15), which has no missing parents 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 15 (SampledRDD[7] at takeSample at <console>:15) 14/04/04 17:28:49 INFO local.LocalTaskSetManager: Size of task 14 is 1850 bytes 14/04/04 17:28:49 INFO executor.Executor: Running task ID 14 14/04/04 17:28:49 INFO storage.BlockManager: Found block broadcast_0 locally 14/04/04 17:28:49 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72 14/04/04 17:28:49 INFO executor.Executor: Serialized size of result for 14 is 519 14/04/04 17:28:49 INFO executor.Executor: Sending result for 14 directly to driver 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Completed ResultTask(15, 0) 14/04/04 17:28:49 INFO local.LocalScheduler: Remove TaskSet 15.0 from pool 14/04/04 17:28:49 INFO executor.Executor: Finished task ID 14 14/04/04 17:28:49 INFO scheduler.DAGScheduler: Stage 15 (takeSample at <console>:15) finished in 0.006 s 14/04/04 17:28:49 INFO spark.SparkContext: Job finished: takeSample at <console>:15, took 0.009247992 s res9: Array[String] = Array(9.0 9.0 9.0, 0.2 0.2 0.2)
saveAsTextFile
将结果存到文件,这里找到大于0.2的点,然后存到shengli.result文件夹里scala> kemeans_data.flatMap(_.split(" ")).map(_.toDouble).filter(_>0.2).saveAsTextFile("shengli.result") 14/04/04 17:31:24 INFO rdd.PairRDDFunctions: Saving as hadoop file of type (NullWritable, Text) 14/04/04 17:31:24 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:15 14/04/04 17:31:24 INFO scheduler.DAGScheduler: Got job 18 (saveAsTextFile at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 17:31:24 INFO scheduler.DAGScheduler: Final stage: Stage 18 (saveAsTextFile at <console>:15) 14/04/04 17:31:24 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 17:31:24 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 17:31:24 INFO scheduler.DAGScheduler: Submitting Stage 18 (MappedRDD[17] at saveAsTextFile at <console>:15), which has no missing parents 14/04/04 17:31:25 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 18 (MappedRDD[17] at saveAsTextFile at <console>:15) 14/04/04 17:31:25 INFO local.LocalTaskSetManager: Size of task 17 is 4805 bytes 14/04/04 17:31:25 INFO executor.Executor: Running task ID 17 14/04/04 17:31:25 INFO storage.BlockManager: Found block broadcast_0 locally 14/04/04 17:31:25 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72 14/04/04 17:31:25 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_201404041731_0000_m_000000_17' to file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/shengli.result 14/04/04 17:31:25 INFO mapred.SparkHadoopWriter: attempt_201404041731_0000_m_000000_17: Committed 14/04/04 17:31:25 INFO executor.Executor: Serialized size of result for 17 is 435 14/04/04 17:31:25 INFO executor.Executor: Sending result for 17 directly to driver 14/04/04 17:31:25 INFO scheduler.DAGScheduler: Completed ResultTask(18, 0) 14/04/04 17:31:25 INFO local.LocalScheduler: Remove TaskSet 18.0 from pool 14/04/04 17:31:25 INFO executor.Executor: Finished task ID 17 14/04/04 17:31:25 INFO scheduler.DAGScheduler: Stage 18 (saveAsTextFile at <console>:15) finished in 0.080 s 14/04/04 17:31:25 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:15, took 0.106020906 s
在根目录下生存了一个文件夹shengli.result,里面有这次任务的结果
cd shengli.result/ ll -rw-r--r-- 1 root root 8 04-04 17:31 ._SUCCESS.crc -rwxrwxrwx 1 root root 0 04-04 17:31 _SUCCESS -rw-r--r-- 1 root root 12 04-04 17:31 .part-00000.crc -rwxrwxrwx 1 root root 36 04-04 17:31 part-00000 drwxr-xr-x 2 root root 4096 04-04 17:31 . drwxr-xr-x 25 1000 1000 4096 04-04 17:33 .. more part-00000 9.0 9.0 9.0 9.1 9.1 9.1 9.2 9.2 9.2
saveAsSequenceFile
scala> kemeans_data.flatMap(_.split(" ")).map(word=>(word,1)).saveAsSequenceFile("shengli.seq") 14/04/04 17:39:56 INFO rdd.SequenceFileRDDFunctions: Saving as sequence file of type (Text,IntWritable) 14/04/04 17:39:56 INFO rdd.PairRDDFunctions: Saving as hadoop file of type (Text, IntWritable) 14/04/04 17:39:56 INFO spark.SparkContext: Starting job: saveAsSequenceFile at <console>:15 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Got job 19 (saveAsSequenceFile at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Final stage: Stage 19 (saveAsSequenceFile at <console>:15) 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Submitting Stage 19 (MappedRDD[20] at saveAsSequenceFile at <console>:15), which has no missing parents 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 19 (MappedRDD[20] at saveAsSequenceFile at <console>:15) 14/04/04 17:39:56 INFO local.LocalTaskSetManager: Size of task 18 is 4837 bytes 14/04/04 17:39:56 INFO executor.Executor: Running task ID 18 14/04/04 17:39:56 INFO storage.BlockManager: Found block broadcast_0 locally 14/04/04 17:39:56 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72 14/04/04 17:39:56 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_201404041739_0000_m_000000_18' to file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/shengli.seq 14/04/04 17:39:56 INFO mapred.SparkHadoopWriter: attempt_201404041739_0000_m_000000_18: Committed 14/04/04 17:39:56 INFO executor.Executor: Serialized size of result for 18 is 435 14/04/04 17:39:56 INFO executor.Executor: Sending result for 18 directly to driver 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Completed ResultTask(19, 0) 14/04/04 17:39:56 INFO local.LocalScheduler: Remove TaskSet 19.0 from pool 14/04/04 17:39:56 INFO executor.Executor: Finished task ID 18 14/04/04 17:39:56 INFO scheduler.DAGScheduler: Stage 19 (saveAsSequenceFile at <console>:15) finished in 0.112 s 14/04/04 17:39:56 INFO spark.SparkContext: Job finished: saveAsSequenceFile at <console>:15, took 0.199192684 s
查看结果文件,的确是sequenceFile
# cd shengli.seq/ # ll -rw-r--r-- 1 root root 8 04-04 17:39 ._SUCCESS.crc -rwxrwxrwx 1 root root 0 04-04 17:39 _SUCCESS -rw-r--r-- 1 root root 12 04-04 17:39 .part-00000.crc -rwxrwxrwx 1 root root 373 04-04 17:39 part-00000 drwxr-xr-x 26 1000 1000 4096 04-04 17:39 .. drwxr-xr-x 2 root root 4096 04-04 17:39 . # cat part-00000 SEQorg.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable异妈 "0.00.00.00.10.10.10.20.20.29.09.09.09.19.19.19.29.29.2
countByKey
很好理解,查看一下一个key的个数。Only
available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key.
scala> kemeans_data.flatMap(_.split(" ")).map(word=>(word,1)).countByKey() 14/04/04 17:44:02 INFO spark.SparkContext: Starting job: countByKey at <console>:15 14/04/04 17:44:02 INFO scheduler.DAGScheduler: Got job 20 (countByKey at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 17:44:02 INFO scheduler.DAGScheduler: Final stage: Stage 20 (countByKey at <console>:15) 14/04/04 17:44:02 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 17:44:02 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 17:44:02 INFO scheduler.DAGScheduler: Submitting Stage 20 (MapPartitionsRDD[24] at countByKey at <console>:15), which has no missing parents 14/04/04 17:44:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 20 (MapPartitionsRDD[24] at countByKey at <console>:15) 14/04/04 17:44:02 INFO local.LocalTaskSetManager: Size of task 19 is 1995 bytes 14/04/04 17:44:02 INFO executor.Executor: Running task ID 19 14/04/04 17:44:02 INFO storage.BlockManager: Found block broadcast_0 locally 14/04/04 17:44:03 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72 14/04/04 17:44:03 INFO executor.Executor: Serialized size of result for 19 is 823 14/04/04 17:44:03 INFO executor.Executor: Sending result for 19 directly to driver 14/04/04 17:44:03 INFO scheduler.DAGScheduler: Completed ResultTask(20, 0) 14/04/04 17:44:03 INFO local.LocalScheduler: Remove TaskSet 20.0 from pool 14/04/04 17:44:03 INFO executor.Executor: Finished task ID 19 14/04/04 17:44:03 INFO scheduler.DAGScheduler: Stage 20 (countByKey at <console>:15) finished in 0.024 s 14/04/04 17:44:03 INFO spark.SparkContext: Job finished: countByKey at <console>:15, took 0.073272783 s res15: scala.collection.Map[java.lang.String,Long] = Map(9.0 -> 3, 9.1 -> 3, 0.0 -> 3, 0.2 -> 3, 0.1 -> 3, 9.2 -> 3)
foreach
这个最常用,遍历集合,对集合中的每个元素应用func。原创,转载请注明出处http://blog.csdn.net/oopsoom/article/details/22948207,谢谢。
相关文章推荐
- RDD Dependency详解---Spark学习笔记9
- Spark RDD Transformation 详解---Spark学习笔记7
- Spark Hadoop集群部署与Spark操作HDFS运行详解---Spark学习笔记10
- Spark学习笔记之SparkRDD
- spark学习笔记之一:RDD的五大特性
- Spark学习笔记:在瞎搞RDD中得到的收获和一些疑问
- Spark学习笔记 --- spark RDD加载文件
- Spark学习笔记之<RDD原理>
- Scala中隐式参数与隐式转换的联合使用实战详解及其在Spark中的应用源码解析之Scala学习笔记-51
- 通过wordCount实战详解Spark RDD创建 -- (视频笔记)
- 第60讲:Scala中隐式参数实战详解以及隐式参数在Spark中的应用源码解析学习笔记
- Spark2.x学习笔记:3、 Spark核心概念RDD
- Spark学习笔记(23)Transformation、Action等源码图解
- Spark学习笔记 --- Transformation and Action
- 第64课:SparkSQL下Parquet的数据切分和压缩内幕详解学习笔记
- Spark学习笔记(一)--RDD编程
- Scala中隐式参数实战详解以及隐式参数在Spark中的应用源码解析之Scala学习笔记-50
- Spark2.x学习笔记:11、RDD依赖关系与stage划分
- Spark学习4: RDD详解
- Spark学习笔记2:RDD编程