您的位置:首页 > 运维架构

spark streaming源码分析2 从简单例子看DStream上的operation

2015-10-12 19:57 239 查看
博客地址: http://blog.csdn.net/yueqian_zhu/

先贴一下上一节的例子

object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
这一节学习一下Dstream上的operation部分

1、调用socketTextStream方法,返回一个ReceiverInputDStream类型。它继承与InputDStream,InputDStream又继承于DStream

(1)设置本身的InputDStream到DStreamGraph中

(2)获取streamId

2、调用flatMap方法,返回一个flatMappedDStream。

看一下FlatMappedDStream的成员

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {

override def dependencies: List[DStream[_]] = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}
与RDD的操作非常类似

dependencies:即调用flatmap操作的DStream,这里指ReceiverInputDStream

slideDuration:Dstream产生RDD的时间间隔,即批处理间隔

compute:根据参数得到一个RDD,继而在这个RDD上调用flatmap操作。flatmap操作的方法参数实际上附加到了RDD的身上。



2、调用map方法,其实是将map方法附加给了RDD。之后的reduceByKey同理。

3、调用print方法,它是一个输出操作。默认输出RDD的前10个元素。调用print方法得到一个ForEachDStream,并将这个ForEachDStream注册到DStreamGraph中。

至此,operation部分就结束了。此时,还没有真正执行起来,这需要调用StreamingContext的start方法才行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息