您的位置:首页 > 其它

NetworkWordCount 例子工作流程详解

2016-12-01 16:35 288 查看
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}


ssc.socketTextStream(),读取数据到lines中,StreamingContext.scala中


lines.flatMap(_.split(” “))将每一行中的单词按照空格分开成一个一个的独立单词在words里面,Dstream.scala中:flatMap产生的是FlatMappedDStream



words.map(x => (x, 1))将每个单词映射成(key,value)格式,前面是单词后面是1 代表该单词出现一次,Dstream.scala中:Map产生的是MappedDStream



接着是reduceByKey(+),该方法通过操作map()生成的(key,value)按key统计实现了相同key的value累加,PairDStreamFunctions.scala中reduceByKey产生的是shuffledDStream



wordCounts.print(),Dstream.scala中:


在这里面调用了foreachRDD,foreachRDD返回类型是Unit,但是里面生成一个ForEachDStream是一个输出型的RDD,会注册给DstreamGraph, DstreamGraph .scala中的outputStreams

ForEachDStream.scala中:




-

-注册的代码




DstreamGraph .scala中outputStreams变量


,,,

6. 既然有outputStreams那么就有inputstreams通过回溯可以找到是什么时候放到DstreamGraph 中的








内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  streaming NetworkWor