您的位置:首页 > 其它

Spark RDD Action 详解---Spark学习笔记8

2014-04-04 17:46 721 查看
话接上回Spark RDD Transformation

这次学习的是Spark RDD 的Action 部分函数。

这次拿根目录下的kmeans_data.txt当数据源来实践。

reduce

和transformation里面的reduceByKey差不多,但是这里没有根据key分组,只是单纯的2个参数。

val kemeans_data = sc.textFile("kmeans_data.txt")
kemeans_data.flatMap(_.split(" ")).map(_.toDouble).reduce(_+_)
res67: Double = 82.80000000000001


collect

将当前的数据集扁平化为一个数组,
<pre name="code" class="java">scala> kemeans_data.collect()
14/04/04 17:17:38 INFO spark.SparkContext: Starting job: collect at <console>:15
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Got job 35 (collect at <console>:15) with 1 output partitions (allowLocal=false)
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Final stage: Stage 46 (collect at <console>:15)
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Submitting Stage 46 (MappedRDD[93] at textFile at <console>:12), which has no missing parents
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 46 (MappedRDD[93] at textFile at <console>:12)
14/04/04 17:17:38 INFO local.LocalTaskSetManager: Size of task 40 is 1656 bytes
14/04/04 17:17:38 INFO executor.Executor: Running task ID 40
14/04/04 17:17:38 INFO storage.BlockManager: Found block broadcast_2 locally
14/04/04 17:17:38 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72
14/04/04 17:17:38 INFO executor.Executor: Serialized size of result for 40 is 519
14/04/04 17:17:38 INFO executor.Executor: Sending result for 40 directly to driver
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Completed ResultTask(46, 0)
14/04/04 17:17:38 INFO local.LocalScheduler: Remove TaskSet 46.0 from pool
14/04/04 17:17:38 INFO executor.Executor: Finished task ID 40
14/04/04 17:17:38 INFO scheduler.DAGScheduler: Stage 46 (collect at <console>:15) finished in 0.005 s
14/04/04 17:17:38 INFO spark.SparkContext: Job finished: collect at <console>:15, took 0.008528283 s
res69: Array[String] = Array(0.0 0.0 0.0, 0.1 0.1 0.1, 0.2 0.2 0.2, 9.0 9.0 9.0, 9.1 9.1 9.1, 9.2 9.2 9.2)


其实这里可以清楚的看到,每一行是一个数组的元素。








数据集大了就不要这么玩了,一般是用在较小的书数据集上,大和小是相对于你的机器的。

count

这个地球人都知道。。。返回的是dataset中的element的个数。
kemeans_data.count()
res70: Long = 6


take

顾名思义,取多少个元素
scala> splitedLine.take(1)
14/04/04 11:14:05 INFO spark.SparkContext: Starting job: take at <console>:19
14/04/04 11:14:05 INFO scheduler.DAGScheduler: Got job 6 (take at <console>:19) with 1 output partitions (allowLocal=true)
14/04/04 11:14:05 INFO scheduler.DAGScheduler: Final stage: Stage 6 (take at <console>:19)
14/04/04 11:14:05 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 11:14:05 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 11:14:05 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/04/04 11:14:05 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951
14/04/04 11:14:05 INFO spark.SparkContext: Job finished: take at <console>:19, took 0.004364898 s
res12: Array[Array[java.lang.String]] = Array(Array("", "", Merge, pull, request, #248, from, colorant/branch-0.8))


指定下标来获取元素

scala> splitedLine.take(1)(0)
14/04/04 11:14:10 INFO spark.SparkContext: Starting job: take at <console>:19
14/04/04 11:14:10 INFO scheduler.DAGScheduler: Got job 7 (take at <console>:19) with 1 output partitions (allowLocal=true)
14/04/04 11:14:10 INFO scheduler.DAGScheduler: Final stage: Stage 7 (take at <console>:19)
14/04/04 11:14:10 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 11:14:10 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 11:14:10 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/04/04 11:14:10 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951
14/04/04 11:14:10 INFO spark.SparkContext: Job finished: take at <console>:19, took 0.004808966 s
res13: Array[java.lang.String] = Array("", "", Merge, pull, request, #248, from, colorant/branch-0.8)

scala> splitedLine.take(1)(0)(0)
14/04/04 11:14:15 INFO spark.SparkContext: Starting job: take at <console>:19
14/04/04 11:14:15 INFO scheduler.DAGScheduler: Got job 8 (take at <console>:19) with 1 output partitions (allowLocal=true)
14/04/04 11:14:15 INFO scheduler.DAGScheduler: Final stage: Stage 8 (take at <console>:19)
14/04/04 11:14:15 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 11:14:15 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 11:14:15 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/04/04 11:14:15 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951
14/04/04 11:14:15 INFO spark.SparkContext: Job finished: take at <console>:19, took 0.005684334 s
res14: java.lang.String = ""


first

获取数据集的第一个元素。

kemeans_data.first()

res0: String = 0.0 0.0 0.0

takeSample

deftakeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T]Return
an array with a random sample of num elements
of the dataset, with or without replacement, using the given random number generator seed.

简单来说就是给它一个随机种子seed,取num个数据,是否替换。
scala> kemeans_data.takeSample(false,2,3)
14/04/04 17:28:49 INFO spark.SparkContext: Starting job: takeSample at <console>:15
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Got job 14 (takeSample at <console>:15) with 1 output partitions (allowLocal=false)
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Final stage: Stage 14 (takeSample at <console>:15)
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting Stage 14 (MappedRDD[1] at textFile at <console>:12), which has no missing parents
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 14 (MappedRDD[1] at textFile at <console>:12)
14/04/04 17:28:49 INFO local.LocalTaskSetManager: Size of task 13 is 1645 bytes
14/04/04 17:28:49 INFO executor.Executor: Running task ID 13
14/04/04 17:28:49 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/04 17:28:49 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72
14/04/04 17:28:49 INFO executor.Executor: Serialized size of result for 13 is 473
14/04/04 17:28:49 INFO executor.Executor: Sending result for 13 directly to driver
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Completed ResultTask(14, 0)
14/04/04 17:28:49 INFO local.LocalScheduler: Remove TaskSet 14.0 from pool
14/04/04 17:28:49 INFO executor.Executor: Finished task ID 13
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Stage 14 (takeSample at <console>:15) finished in 0.008 s
14/04/04 17:28:49 INFO spark.SparkContext: Job finished: takeSample at <console>:15, took 0.012453567 s
14/04/04 17:28:49 INFO spark.SparkContext: Starting job: takeSample at <console>:15
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Got job 15 (takeSample at <console>:15) with 1 output partitions (allowLocal=false)
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Final stage: Stage 15 (takeSample at <console>:15)
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting Stage 15 (SampledRDD[7] at takeSample at <console>:15), which has no missing parents
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 15 (SampledRDD[7] at takeSample at <console>:15)
14/04/04 17:28:49 INFO local.LocalTaskSetManager: Size of task 14 is 1850 bytes
14/04/04 17:28:49 INFO executor.Executor: Running task ID 14
14/04/04 17:28:49 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/04 17:28:49 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72
14/04/04 17:28:49 INFO executor.Executor: Serialized size of result for 14 is 519
14/04/04 17:28:49 INFO executor.Executor: Sending result for 14 directly to driver
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Completed ResultTask(15, 0)
14/04/04 17:28:49 INFO local.LocalScheduler: Remove TaskSet 15.0 from pool
14/04/04 17:28:49 INFO executor.Executor: Finished task ID 14
14/04/04 17:28:49 INFO scheduler.DAGScheduler: Stage 15 (takeSample at <console>:15) finished in 0.006 s
14/04/04 17:28:49 INFO spark.SparkContext: Job finished: takeSample at <console>:15, took 0.009247992 s
res9: Array[String] = Array(9.0 9.0 9.0, 0.2 0.2 0.2)

saveAsTextFile

将结果存到文件,这里找到大于0.2的点,然后存到shengli.result文件夹里
scala> kemeans_data.flatMap(_.split(" ")).map(_.toDouble).filter(_>0.2).saveAsTextFile("shengli.result")
14/04/04 17:31:24 INFO rdd.PairRDDFunctions: Saving as hadoop file of type (NullWritable, Text)
14/04/04 17:31:24 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:15
14/04/04 17:31:24 INFO scheduler.DAGScheduler: Got job 18 (saveAsTextFile at <console>:15) with 1 output partitions (allowLocal=false)
14/04/04 17:31:24 INFO scheduler.DAGScheduler: Final stage: Stage 18 (saveAsTextFile at <console>:15)
14/04/04 17:31:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 17:31:24 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 17:31:24 INFO scheduler.DAGScheduler: Submitting Stage 18 (MappedRDD[17] at saveAsTextFile at <console>:15), which has no missing parents
14/04/04 17:31:25 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 18 (MappedRDD[17] at saveAsTextFile at <console>:15)
14/04/04 17:31:25 INFO local.LocalTaskSetManager: Size of task 17 is 4805 bytes
14/04/04 17:31:25 INFO executor.Executor: Running task ID 17
14/04/04 17:31:25 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/04 17:31:25 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72
14/04/04 17:31:25 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_201404041731_0000_m_000000_17' to file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/shengli.result
14/04/04 17:31:25 INFO mapred.SparkHadoopWriter: attempt_201404041731_0000_m_000000_17: Committed
14/04/04 17:31:25 INFO executor.Executor: Serialized size of result for 17 is 435
14/04/04 17:31:25 INFO executor.Executor: Sending result for 17 directly to driver
14/04/04 17:31:25 INFO scheduler.DAGScheduler: Completed ResultTask(18, 0)
14/04/04 17:31:25 INFO local.LocalScheduler: Remove TaskSet 18.0 from pool
14/04/04 17:31:25 INFO executor.Executor: Finished task ID 17
14/04/04 17:31:25 INFO scheduler.DAGScheduler: Stage 18 (saveAsTextFile at <console>:15) finished in 0.080 s
14/04/04 17:31:25 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:15, took 0.106020906 s


在根目录下生存了一个文件夹shengli.result,里面有这次任务的结果
cd shengli.result/
ll
-rw-r--r--  1 root root    8 04-04 17:31 ._SUCCESS.crc
-rwxrwxrwx  1 root root    0 04-04 17:31 _SUCCESS
-rw-r--r--  1 root root   12 04-04 17:31 .part-00000.crc
-rwxrwxrwx  1 root root   36 04-04 17:31 part-00000
drwxr-xr-x  2 root root 4096 04-04 17:31 .
drwxr-xr-x 25 1000 1000 4096 04-04 17:33 ..

more part-00000
9.0
9.0
9.0
9.1
9.1
9.1
9.2
9.2
9.2

saveAsSequenceFile

scala> kemeans_data.flatMap(_.split(" ")).map(word=>(word,1)).saveAsSequenceFile("shengli.seq")
14/04/04 17:39:56 INFO rdd.SequenceFileRDDFunctions: Saving as sequence file of type (Text,IntWritable)
14/04/04 17:39:56 INFO rdd.PairRDDFunctions: Saving as hadoop file of type (Text, IntWritable)
14/04/04 17:39:56 INFO spark.SparkContext: Starting job: saveAsSequenceFile at <console>:15
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Got job 19 (saveAsSequenceFile at <console>:15) with 1 output partitions (allowLocal=false)
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Final stage: Stage 19 (saveAsSequenceFile at <console>:15)
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Submitting Stage 19 (MappedRDD[20] at saveAsSequenceFile at <console>:15), which has no missing parents
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 19 (MappedRDD[20] at saveAsSequenceFile at <console>:15)
14/04/04 17:39:56 INFO local.LocalTaskSetManager: Size of task 18 is 4837 bytes
14/04/04 17:39:56 INFO executor.Executor: Running task ID 18
14/04/04 17:39:56 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/04 17:39:56 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72
14/04/04 17:39:56 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_201404041739_0000_m_000000_18' to file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/shengli.seq
14/04/04 17:39:56 INFO mapred.SparkHadoopWriter: attempt_201404041739_0000_m_000000_18: Committed
14/04/04 17:39:56 INFO executor.Executor: Serialized size of result for 18 is 435
14/04/04 17:39:56 INFO executor.Executor: Sending result for 18 directly to driver
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Completed ResultTask(19, 0)
14/04/04 17:39:56 INFO local.LocalScheduler: Remove TaskSet 19.0 from pool
14/04/04 17:39:56 INFO executor.Executor: Finished task ID 18
14/04/04 17:39:56 INFO scheduler.DAGScheduler: Stage 19 (saveAsSequenceFile at <console>:15) finished in 0.112 s
14/04/04 17:39:56 INFO spark.SparkContext: Job finished: saveAsSequenceFile at <console>:15, took 0.199192684 s


查看结果文件,的确是sequenceFile
# cd shengli.seq/
# ll
-rw-r--r--  1 root root    8 04-04 17:39 ._SUCCESS.crc
-rwxrwxrwx  1 root root    0 04-04 17:39 _SUCCESS
-rw-r--r--  1 root root   12 04-04 17:39 .part-00000.crc
-rwxrwxrwx  1 root root  373 04-04 17:39 part-00000
drwxr-xr-x 26 1000 1000 4096 04-04 17:39 ..
drwxr-xr-x  2 root root 4096 04-04 17:39 .

# cat part-00000
SEQorg.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable异妈
"0.00.00.00.10.10.10.20.20.29.09.09.09.19.19.19.29.29.2

countByKey

很好理解,查看一下一个key的个数。
Only
available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key.

scala> kemeans_data.flatMap(_.split(" ")).map(word=>(word,1)).countByKey()
14/04/04 17:44:02 INFO spark.SparkContext: Starting job: countByKey at <console>:15
14/04/04 17:44:02 INFO scheduler.DAGScheduler: Got job 20 (countByKey at <console>:15) with 1 output partitions (allowLocal=false)
14/04/04 17:44:02 INFO scheduler.DAGScheduler: Final stage: Stage 20 (countByKey at <console>:15)
14/04/04 17:44:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 17:44:02 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 17:44:02 INFO scheduler.DAGScheduler: Submitting Stage 20 (MapPartitionsRDD[24] at countByKey at <console>:15), which has no missing parents
14/04/04 17:44:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 20 (MapPartitionsRDD[24] at countByKey at <console>:15)
14/04/04 17:44:02 INFO local.LocalTaskSetManager: Size of task 19 is 1995 bytes
14/04/04 17:44:02 INFO executor.Executor: Running task ID 19
14/04/04 17:44:02 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/04 17:44:03 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/kmeans_data.txt:0+72
14/04/04 17:44:03 INFO executor.Executor: Serialized size of result for 19 is 823
14/04/04 17:44:03 INFO executor.Executor: Sending result for 19 directly to driver
14/04/04 17:44:03 INFO scheduler.DAGScheduler: Completed ResultTask(20, 0)
14/04/04 17:44:03 INFO local.LocalScheduler: Remove TaskSet 20.0 from pool
14/04/04 17:44:03 INFO executor.Executor: Finished task ID 19
14/04/04 17:44:03 INFO scheduler.DAGScheduler: Stage 20 (countByKey at <console>:15) finished in 0.024 s
14/04/04 17:44:03 INFO spark.SparkContext: Job finished: countByKey at <console>:15, took 0.073272783 s
res15: scala.collection.Map[java.lang.String,Long] = Map(9.0 -> 3, 9.1 -> 3, 0.0 -> 3, 0.2 -> 3, 0.1 -> 3, 9.2 -> 3)


foreach

这个最常用,遍历集合,对集合中的每个元素应用func。

原创,转载请注明出处http://blog.csdn.net/oopsoom/article/details/22948207,谢谢。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rdd spark scala jvm