Spark定制班第24课:Spark Streaming的Transformation、Action、Input和Output源码图解
2016-06-12 12:37
459 查看
本期内容:
1. Spark Streaming的Transformation、Action源码图解
2. Spark Streaming的Input、Output源码图解
StreamingContext成员:socketStream
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
SocketInputDStream:
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
ReceiverInputDStream:
abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
...
InputDStream:
ssc.graph.addInputStream(this)
把ImputDStream放入到了graph中。
DStream.foreachRDD产生ForEachDStream对象,该对象通过register对象也放入了graph中。
1. Spark Streaming的Transformation、Action源码图解
2. Spark Streaming的Input、Output源码图解
StreamingContext成员:socketStream
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
SocketInputDStream:
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
ReceiverInputDStream:
abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
...
InputDStream:
ssc.graph.addInputStream(this)
把ImputDStream放入到了graph中。
DStream.foreachRDD产生ForEachDStream对象,该对象通过register对象也放入了graph中。
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 架构纵横谈之二 ---- 架构的模式与要点
- 用ASP编写的加密和解密类
- 解密网页加密的两个方法
- BS项目中的CSS架构_仅加载自己需要的CSS
- VBS脚本加密/解密VBS脚本(简易免杀版1.1)
- 浅析Ruby的源代码布局及其编程风格
- 关于三种主流WEB架构的思考
- C#编写DES加密、解密类
- C#实现对文件进行加密解密的方法
- C#实现数据包加密与解密实例详解
- C#最简单的字符串加密解密方法
- Android操作系统的架构设计分析
- 基于C#对用户密码使用MD5加密与解密
- PHP加密解密字符串汇总