spark下统计单词频次
2015-03-06 08:51
169 查看
写了一个简单的语句,还没有优化:
注意这代码还可以优化:
输出结果如下:
通过如下代码,可以输出参与计算的节点名称,注意start-all并指定shell的–master参数:
代码如下:
scala> sc. | textFile("/etc/profile"). | flatMap((s:String)=>s.split("\\s")). | map(_.toUpperCase). | map((s:String)=>(s, 1)). | filter((pair)=>pair._1.forall((ch)=>ch>'A'&&ch<'Z')). | reduceByKey(_+_). | sortByKey(). | foreach(println)
注意这代码还可以优化:
scala> sc. | textFile("/etc/profile"). | flatMap(_.split("\\s")). | map(_.toUpperCase). | map((_, 1)). | filter(_._1.forall((ch)=>ch>'A'&&ch<'Z')). | reduceByKey(_+_). | sortByKey(). | foreach(println)
输出结果如下:
15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(75904) called with curMem=259812, maxMem=277842493 15/03/06 08:50:44 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 74.1 KB, free 264.7 MB) 15/03/06 08:50:44 INFO FileInputFormat: Total input paths to process : 1 15/03/06 08:50:44 INFO SparkContext: Starting job: sortByKey at <console>:20 15/03/06 08:50:44 INFO DAGScheduler: Registering RDD 25 (filter at <console>:18) 15/03/06 08:50:44 INFO DAGScheduler: Got job 4 (sortByKey at <console>:20) with 2 output partitions (allowLocal=false) 15/03/06 08:50:44 INFO DAGScheduler: Final stage: Stage 10(sortByKey at <console>:20) 15/03/06 08:50:44 INFO DAGScheduler: Parents of final stage: List(Stage 11) 15/03/06 08:50:44 INFO DAGScheduler: Missing parents: List(Stage 11) 15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 11 (FilteredRDD[25] at filter at <console>:18), which has no missing parents 15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(3736) called with curMem=335716, maxMem=277842493 15/03/06 08:50:44 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 3.6 KB, free 264.6 MB) 15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 11 (FilteredRDD[25] at filter at <console>:18) 15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 11.0 with 2 tasks 15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 11.0 (TID 16, localhost, PROCESS_LOCAL, 1162 bytes) 15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 11.0 (TID 17, localhost, PROCESS_LOCAL, 1162 bytes) 15/03/06 08:50:44 INFO Executor: Running task 1.0 in stage 11.0 (TID 17) 15/03/06 08:50:44 INFO Executor: Running task 0.0 in stage 11.0 (TID 16) 15/03/06 08:50:44 INFO HadoopRDD: Input split: file:/etc/profile:1189+1189 15/03/06 08:50:44 INFO HadoopRDD: Input split: file:/etc/profile:0+1189 15/03/06 08:50:44 INFO Executor: Finished task 1.0 in stage 11.0 (TID 17). 1863 bytes result sent to driver 15/03/06 08:50:44 INFO TaskSetManager: Finished task 1.0 in stage 11.0 (TID 17) in 43 ms on localhost (1/2) 15/03/06 08:50:44 INFO Executor: Finished task 0.0 in stage 11.0 (TID 16). 1863 bytes result sent to driver 15/03/06 08:50:44 INFO TaskSetManager: Finished task 0.0 in stage 11.0 (TID 16) in 51 ms on localhost (2/2) 15/03/06 08:50:44 INFO DAGScheduler: Stage 11 (filter at <console>:18) finished in 0.054 s 15/03/06 08:50:44 INFO DAGScheduler: looking for newly runnable stages 15/03/06 08:50:44 INFO DAGScheduler: running: Set() 15/03/06 08:50:44 INFO DAGScheduler: waiting: Set(Stage 10) 15/03/06 08:50:44 INFO DAGScheduler: failed: Set() 15/03/06 08:50:44 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 15/03/06 08:50:44 INFO DAGScheduler: Missing parents for Stage 10: List() 15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 10 (MapPartitionsRDD[28] at sortByKey at <console>:20), which is now runnable 15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(2856) called with curMem=339452, maxMem=277842493 15/03/06 08:50:44 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 2.8 KB, free 264.6 MB) 15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 10 (MapPartitionsRDD[28] at sortByKey at <console>:20) 15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 10.0 with 2 tasks 15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 18, localhost, PROCESS_LOCAL, 948 bytes) 15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 10.0 (TID 19, localhost, PROCESS_LOCAL, 948 bytes) 15/03/06 08:50:44 INFO Executor: Running task 0.0 in stage 10.0 (TID 18) 15/03/06 08:50:44 INFO Executor: Running task 1.0 in stage 10.0 (TID 19) 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/03/06 08:50:44 INFO Executor: Finished task 0.0 in stage 10.0 (TID 18). 1165 bytes result sent to driver 15/03/06 08:50:44 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 18) in 18 ms on localhost (1/2) 15/03/06 08:50:44 INFO Executor: Finished task 1.0 in stage 10.0 (TID 19). 1293 bytes result sent to driver 15/03/06 08:50:44 INFO TaskSetManager: Finished task 1.0 in stage 10.0 (TID 19) in 28 ms on localhost (2/2) 15/03/06 08:50:44 INFO DAGScheduler: Stage 10 (sortByKey at <console>:20) finished in 0.031 s 15/03/06 08:50:44 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks have all completed, from pool 15/03/06 08:50:44 INFO SparkContext: Job finished: sortByKey at <console>:20, took 0.107864348 s 15/03/06 08:50:44 INFO SparkContext: Starting job: foreach at <console>:21 15/03/06 08:50:44 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 144 bytes 15/03/06 08:50:44 INFO DAGScheduler: Registering RDD 26 (reduceByKey at <console>:19) 15/03/06 08:50:44 INFO DAGScheduler: Got job 5 (foreach at <console>:21) with 2 output partitions (allowLocal=false) 15/03/06 08:50:44 INFO DAGScheduler: Final stage: Stage 12(foreach at <console>:21) 15/03/06 08:50:44 INFO DAGScheduler: Parents of final stage: List(Stage 14) 15/03/06 08:50:44 INFO DAGScheduler: Missing parents: List(Stage 14) 15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 14 (ShuffledRDD[26] at reduceByKey at <console>:19), which has no missing parents 15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(2472) called with curMem=342308, maxMem=277842493 15/03/06 08:50:44 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 2.4 KB, free 264.6 MB) 15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 14 (ShuffledRDD[26] at reduceByKey at <console>:19) 15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 14.0 with 2 tasks 15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 20, localhost, PROCESS_LOCAL, 937 bytes) 15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 14.0 (TID 21, localhost, PROCESS_LOCAL, 937 bytes) 15/03/06 08:50:44 INFO Executor: Running task 1.0 in stage 14.0 (TID 21) 15/03/06 08:50:44 INFO Executor: Running task 0.0 in stage 14.0 (TID 20) 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/03/06 08:50:44 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 15/03/06 08:50:44 INFO Executor: Finished task 1.0 in stage 14.0 (TID 21). 996 bytes result sent to driver 15/03/06 08:50:44 INFO TaskSetManager: Finished task 1.0 in stage 14.0 (TID 21) in 14 ms on localhost (1/2) 15/03/06 08:50:44 INFO Executor: Finished task 0.0 in stage 14.0 (TID 20). 996 bytes result sent to driver 15/03/06 08:50:44 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 20) in 21 ms on localhost (2/2) 15/03/06 08:50:44 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool 15/03/06 08:50:44 INFO DAGScheduler: Stage 14 (reduceByKey at <console>:19) finished in 0.022 s 15/03/06 08:50:44 INFO DAGScheduler: looking for newly runnable stages 15/03/06 08:50:44 INFO DAGScheduler: running: Set() 15/03/06 08:50:44 INFO DAGScheduler: waiting: Set(Stage 12) 15/03/06 08:50:44 INFO DAGScheduler: failed: Set() 15/03/06 08:50:44 INFO DAGScheduler: Missing parents for Stage 12: List() 15/03/06 08:50:44 INFO DAGScheduler: Submitting Stage 12 (ShuffledRDD[29] at sortByKey at <console>:20), which is now runnable 15/03/06 08:50:44 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=344780, maxMem=277842493 15/03/06 08:50:44 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 2.3 KB, free 264.6 MB) 15/03/06 08:50:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 12 (ShuffledRDD[29] at sortByKey at <console>:20) 15/03/06 08:50:44 INFO TaskSchedulerImpl: Adding task set 12.0 with 2 tasks 15/03/06 08:50:44 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 22, localhost, PROCESS_LOCAL, 948 bytes) 15/03/06 08:50:44 INFO TaskSetManager: Starting task 1.0 in stage 12.0 (TID 23, localhost, PROCESS_LOCAL, 948 bytes) 15/03/06 08:50:45 INFO Executor: Running task 1.0 in stage 12.0 (TID 23) 15/03/06 08:50:45 INFO Executor: Running task 0.0 in stage 12.0 (TID 22) 15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/03/06 08:50:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms (LOGIN,2) (MERGING,1) (MUCH,1) (NEED,1) (NOT,1) (PREVENT,1) (RESERVED,1) (SCRIPT,1) (SETS,1) (SETUP,1) (SHELL,2) (SYSTEM,2) (THE,1) (THEN,8) (THIS,3) (THRESHOLD,1) (TO,5) (UIDGID,1) (UNLESS,1) (UNSET,2) (USER,1) (WE,1) (WIDE,1) (WILL,1) (YOU,3) (YOUR,1) 15/03/06 08:50:45 INFO Executor: Finished task 1.0 in stage 12.0 (TID 23). 826 bytes result sent to driver 15/03/06 08:50:45 INFO TaskSetManager: Finished task 1.0 in stage 12.0 (TID 23) in 13 ms on localhost (1/2) (,260) (BETTER,1) (BY,1) (CHECK,1) (COULD,1) (CURRENT,1) (CUSTOM,1) (DO,1) (DONE,1) (ELSE,5) (ENVIRONMENT,1) (EXPORT,15) (FI,8) (FILE,2) (FOR,5) (FUNCTIONS,1) (FUTURE,1) (GET,1) (GO,1) (GOOD,1) (HISTCONTROL,1) (I,2) (IF,8) (IN,6) (IS,1) (IT,1) (KNOW,1) (KSH,1) 15/03/06 08:50:45 INFO Executor: Finished task 0.0 in stage 12.0 (TID 22). 826 bytes result sent to driver 15/03/06 08:50:45 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 22) in 27 ms on localhost (2/2) 15/03/06 08:50:45 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 15/03/06 08:50:45 INFO DAGScheduler: Stage 12 (foreach at <console>:21) finished in 0.025 s 15/03/06 08:50:45 INFO SparkContext: Job finished: foreach at <console>:21, took 0.07397057 s
通过如下代码,可以输出参与计算的节点名称,注意start-all并指定shell的–master参数:
spark-shell --master spark://bluejoe0:7077
代码如下:
rdd.mapPartitions(_=>Array[String](("hostname" !!).trim).iterator, false).collect res28: Array[String] = Array(bluejoe4, bluejoe5)
相关文章推荐
- spark下统计单词频次
- Spark案例:Python版统计单词个数
- 利用Java的Spark做单词统计并排序
- 2-1、Spark的单词统计WC
- 使用spark的dataframe实现单词统计
- spark-streaming 编程(二) word count单词计数统计
- SparkStreaming的实时单词统计小例子
- spark 统计单词个数
- Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数
- 用IDEA写spark单词统计
- [置顶] 【spark 词频统计】spark单词进行计数升级版
- Spark学习—统计文件单词出现次数
- SPark单词统计
- spark:学习杂记+wordcount(单词统计)--22
- 用spark建立一个单词统计的应用
- spark 单词统计
- spark on yarn运行scala单词统计程序出错
- Spark案例:Scala版统计单词个数
- spark统计文献中每个英文单词出现的次数
- java,scala之spark streaming 版本的单词统计(通过监听端口)