Spark Streaming 自定义接收器
2016-07-28 17:15
288 查看
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等)。这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据。本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解。需要注意的是,自定义接收器可以用Scala或者Java实现。
实现自定义Receiver
自定义接收器必须继承自抽象类Receiver,实现两个方法
onStart():接收数据。
onStop():停止接收数据。
onStart()方法和onStop()方法不能无限期的阻塞。通常,onStart()方法将会启动负责接收数据的线程,而onStop()方法负责确保这些接收数据的线程是停止的。接收收据的线程可以使用isStopped()方法来检查它们是否应该停止接收数据。
一旦接收到数据,通过调用store(data)方法(Receiver类中提供的方法)数据被存储在spark中。store()方法有很多种,如允许一次只接收一条数据(record-at-a-time)或者全部object/序列化字节的collection。注意,store()类型影响了接收器(receiver)的可靠性和容错性语义。这是稍后讨论的更详细的。
在接收线程中的任何异常都应该被捕获并处理,以避免接收器的无声故障。restart(<exception>)将会通过异步调用onStop()方法,延迟一段时间后调用onStop()方法重启接收器(receiver)。stop(<exception>)将会调用onStop()方法终止接收器。同时,reporterror(<error>)向驱动程序报告错误信息而不停止或者重启接收器,这些错误信息可以在UI上或者日志中看见。
以下是接收一个套接字上的文本流的自定义接收器。它以文本流中的“\”分隔线分割,并将它们储存在spark中。如果接收线程有任何连接错误或接收错误,则接收器将重新启动。
[java] view
plain copy
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
在spark streaming中使用自定义的接收器
自定义接收器可用于在Spark Streaming应用中,通过使用streamingContext.receiverStream(自定义接收器的实例)。如下所示,创建一个输入Dstream
[java] view
plain copy
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
...
接收器的可靠性
正如在spark streaming编程指南中讨论的那样,基于接收器的可靠性和容错语义,有两种类型的接收器:
1 可靠的接收器:对于可靠的消息来源,允许发送的数据被确认,一个可靠的接收器正确地确认数据被接收器接收同时被可靠地存储在spark中。通常,实现可靠的接收器需仔细考量消息确认的语义。
2 不可靠的接收器:不可靠的接收器不向数据源发送确认信息。它可用于不支持确认机制的数据源,或者那些可靠的数据源但是我们不需要其使用复杂的确认机制。
为了实现可靠的接收器,必须要使用
不可靠的接收器没有实现这种逻辑,它可以简单地从数据源接收记录并使用store(single-record)将它们插入(one-at-a-time )。它没有store(multiple-records)那样的可靠保证,其优点如下:
系统考虑到将数据转化为适当大小的块;
系统考虑到控制接收速率,如果速率限制已指定;
不可靠接收器比可靠接收器更加容易实现;
实现自定义Receiver
自定义接收器必须继承自抽象类Receiver,实现两个方法
onStart():接收数据。
onStop():停止接收数据。
onStart()方法和onStop()方法不能无限期的阻塞。通常,onStart()方法将会启动负责接收数据的线程,而onStop()方法负责确保这些接收数据的线程是停止的。接收收据的线程可以使用isStopped()方法来检查它们是否应该停止接收数据。
一旦接收到数据,通过调用store(data)方法(Receiver类中提供的方法)数据被存储在spark中。store()方法有很多种,如允许一次只接收一条数据(record-at-a-time)或者全部object/序列化字节的collection。注意,store()类型影响了接收器(receiver)的可靠性和容错性语义。这是稍后讨论的更详细的。
在接收线程中的任何异常都应该被捕获并处理,以避免接收器的无声故障。restart(<exception>)将会通过异步调用onStop()方法,延迟一段时间后调用onStop()方法重启接收器(receiver)。stop(<exception>)将会调用onStop()方法终止接收器。同时,reporterror(<error>)向驱动程序报告错误信息而不停止或者重启接收器,这些错误信息可以在UI上或者日志中看见。
以下是接收一个套接字上的文本流的自定义接收器。它以文本流中的“\”分隔线分割,并将它们储存在spark中。如果接收线程有任何连接错误或接收错误,则接收器将重新启动。
[java] view
plain copy
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
在spark streaming中使用自定义的接收器
自定义接收器可用于在Spark Streaming应用中,通过使用streamingContext.receiverStream(自定义接收器的实例)。如下所示,创建一个输入Dstream
[java] view
plain copy
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
...
接收器的可靠性
正如在spark streaming编程指南中讨论的那样,基于接收器的可靠性和容错语义,有两种类型的接收器:
1 可靠的接收器:对于可靠的消息来源,允许发送的数据被确认,一个可靠的接收器正确地确认数据被接收器接收同时被可靠地存储在spark中。通常,实现可靠的接收器需仔细考量消息确认的语义。
2 不可靠的接收器:不可靠的接收器不向数据源发送确认信息。它可用于不支持确认机制的数据源,或者那些可靠的数据源但是我们不需要其使用复杂的确认机制。
为了实现可靠的接收器,必须要使用
store(multiple-records)取存储数据。这种类型的store()是一种阻塞调用,只有在所有给定的记录被储存在spark里之后才返回。如果接收器的配置存储级别使用复制(默认启用),则复制完成后这个调用返回。因此,它确保了数据被可靠的存储,和接收器可以现在正确地确认消息;在复制数据的中间过程中接收器失败了,这将确保没有数据丢失(缓冲数据没有被确认,数据源将会重新发送)。
不可靠的接收器没有实现这种逻辑,它可以简单地从数据源接收记录并使用store(single-record)将它们插入(one-at-a-time )。它没有store(multiple-records)那样的可靠保证,其优点如下:
系统考虑到将数据转化为适当大小的块;
系统考虑到控制接收速率,如果速率限制已指定;
不可靠接收器比可靠接收器更加容易实现;
相关文章推荐
- UIWebView与JS的交互
- 利用iptables实现基于端口的网络流量统计
- 用Navicat for MySQL 连接 CentOS 6.5
- Delphi无法正确动态调用C++ dll库的几个原因
- CentOS7无法使用tab补全功能??
- 无侵入方面编程-用HttpModule+SoapExtension监视页面执行参数(一)
- 写一个存储过程,造一点测试数据
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- python的numpy模块安装问题
- Docker 与 PAAS
- python的numpy模块安装问题
- python的numpy模块安装问题
- 【bzoj2038】 小Z的袜子(hose)