Storm源码阅读之SpoutOutputCollector
2016-08-19 09:16
344 查看
不得不说storm是一个特别棒的实时计算框架。为了对后文理解的方便,先说几个storm中的术语:
Topology:拓扑图或者拓扑结构。在storm中它通过消息分组的分式连接Spout和Bolt节点定义了运算处理的拓扑结构。如下图:
那什么是Spout呢?
在计算任务需要的数据其实就是由Spout提供的,所以它可以说是Storm中的消息源,一般是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据然后发送给tuple元组的。
那它是通过谁发送的呢?又是如何发送的呢?
这里我们先回答第一个问题,第二个问题以后解答。
好了上面说了那么多就是为了引出今天的任务:阅读SpoutOutputCollector源码。
在阅读之前,我们先明确一下SpoutOutputCollector到底是什么?其实从类名就能说出大概(不得不说老外写的代码的可读性真是好的没法说。这里啰嗦一句,
个人觉得这也是他们分享精神的体现,时刻记住方便给别人看。),它就是Spout输出收集器。
那它到底能干些啥呢?请看代码:
从上述接口ISpoutOutputCollector源码可以看出ISpoutOutputCollector中声明了3个方法,两个属于发送tuple元组的方法,他们之间的差异在上述注释中已说的很清楚,还有一个处理异常的方法。
在SpoutOutputCollector类中,实现了消息发射的方法,并且还提供了多个重载方法方便用户使用。
Topology:拓扑图或者拓扑结构。在storm中它通过消息分组的分式连接Spout和Bolt节点定义了运算处理的拓扑结构。如下图:
那什么是Spout呢?
在计算任务需要的数据其实就是由Spout提供的,所以它可以说是Storm中的消息源,一般是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据然后发送给tuple元组的。
那它是通过谁发送的呢?又是如何发送的呢?
这里我们先回答第一个问题,第二个问题以后解答。
好了上面说了那么多就是为了引出今天的任务:阅读SpoutOutputCollector源码。
在阅读之前,我们先明确一下SpoutOutputCollector到底是什么?其实从类名就能说出大概(不得不说老外写的代码的可读性真是好的没法说。这里啰嗦一句,
个人觉得这也是他们分享精神的体现,时刻记住方便给别人看。),它就是Spout输出收集器。
那它到底能干些啥呢?请看代码:
1.ISpoutOutputCollector
ISpoutOutputCollector :是SpoutOutputCollector的接口public interface ISpoutOutputCollector { /** 发送tuple消息,并返回起发送任务的task的序列号集合 */ List<Integer> emit(String streamId, List<Object> tuple, Object messageId); /** *与上述发送方法类似,只不过emitDirect方法是要指定接收端的task,让接收端特定的task接收消息。 */ void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); /** *处理异常 */ void reportError(Throwable error); }
从上述接口ISpoutOutputCollector源码可以看出ISpoutOutputCollector中声明了3个方法,两个属于发送tuple元组的方法,他们之间的差异在上述注释中已说的很清楚,还有一个处理异常的方法。
2.SpoutOutputCollector
SpoutOutputCollector:它实现了接口ISpoutOutputCollectorpublic class SpoutOutputCollector implements ISpoutOutputCollector { ISpoutOutputCollector _delegate; public SpoutOutputCollector(ISpoutOutputCollector delegate) { _delegate = delegate; } /** * 指定一个streamid和message发射tuple消息并返回起发送消息的task的序号。当tuple消息完全处理了,就会回调ack方法,否则会回调fail方法。 */ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { return _delegate.emit(streamId, tuple, messageId); } /** * emit(String streamId, List<Object> tuple, Object messageId)的重载方法,这没有指定streamid,故采用默认的streamid */ public List<Integer> emit(List<Object> tuple, Object messageId) { return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); } /** * emit(String streamId, List<Object> tuple, Object messageId) *的重载方法,这没有指定streamid,故采用默认的streamid,因为没有messageid,故ack方法和fail方法不会被调用 */ public List<Integer> emit(List<Object> tuple) { return emit(tuple, null); } /** * emit(String streamId, List<Object> tuple, Object messageId)的重载方法,因为没有messageid,故ack方法和fail方法不会被调用 */ public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, tuple, null); } /** * 发射tuple消息,不过需要指定接收端的task来接收,并且输出必须声明为直接流,同时指定用来接收消息的task必须采用直接分组的方式来接收消息. * */ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { _delegate.emitDirect(taskId, streamId, tuple, messageId); } /** * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重载方法,采用默认的streamid */ public void emitDirect(int taskId, List<Object> tuple, Object messageId) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); } /** * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重载方法,因为没有指定的消息id,所以ack和fail方法就不会调用. */ public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, tuple, null); } /** * 该类提供的重载方法,因为没有指定的消息id,所以ack和fail方法就不会调用. */ public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, tuple, null); } /** * 接口ISpoutOutputCollector中reportError的实现. */ @Override public void reportError(Throwable error) { _delegate.reportError(error); } }
在SpoutOutputCollector类中,实现了消息发射的方法,并且还提供了多个重载方法方便用户使用。
相关文章推荐
- 建立squall(SQL on Storm)源码阅读环境
- Storm源码阅读(一):使用
- storm源码阅读笔记之任务调度算法
- 源码阅读之storm操作zookeeper-cluster.clj
- 基于storm的实时GPS数据客流特征分析系统 源码分析之(一):GPSReceiverSpout
- (三)storm-kafka源码走读之如何构建一个KafkaSpout
- Storm源码阅读(三):Thrift
- Windows下配置Storm源码阅读环境(vim+ctags)
- spatialhadoop2.3源码阅读(十三) RTreeGridOutputFormat & RTreeGridRecordWriter & RTree[RTree Index MapReuce]
- (五)storm-kafka源码走读之KafkaSpout
- 基于storm的实时GPS数据客流特征分析系统 源码分析之(一):GPSReceiverSpout
- CI框架源码阅读---------Output.php
- [置顶] JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式
- storm-kafka源码走读之KafkaSpout
- spatialhadoop2.3源码阅读(十二) GridOutputFormat & GridRecordWriter[Grid Index MapReuce]
- Storm-源码分析- spout (backtype.storm.spout)
- Storm源码阅读总结 -- Client Nimbus Supervisor
- 01 storm 源码阅读 storm的进程间消息通信实现netty server实现
- JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式
- storm-kafka-plus源码阅读