您的位置:首页 > 编程语言 > ASP

storm-kafka源码走读之KafkaSpout

2017-12-07 21:19 375 查看
from: http://blog.csdn.net/wzhg0508/article/details/40903919

(五)storm-kafka源码走读之KafkaSpout

原创 2014年11月08日 14:09:06

标签:

Storm /

kafka /

大数据 /

实时计算

3458

现在开始介绍KafkaSpout源码了。

开始时,在open方法中做一些初始化,

[java] view plain copy

........................

_state = new ZkState(stateConf);

_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

// using TransactionalState like this is a hack

int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();

if (_spoutConfig.hosts instanceof StaticHosts) {

_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);

} else {

_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);

}

............

前后省略了一些代码,关于metric这系列暂时不介绍。主要是初始化Zookeeper连接zkstate,把kafka Partition 与broker关系对应起来(初始化DynamicPartitionConnections),在DynamicPartitionConnections构造函数需要传入一个brokerReader,我们是zkHosts,看KafkaUtils代码就知道采用的是ZkBrokerReader,来看下ZkBrokerReader的构造函数代码

[java] view plain copy

public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {

try {

reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);

cachedBrokers = reader.getBrokerInfo();

lastRefreshTimeMs = System.currentTimeMillis();

refreshMillis = hosts.refreshFreqSecs * 1000L;

} catch (java.net.SocketTimeoutException e) {

LOG.warn("Failed to update brokers", e);

}

}

有一个refreshMillis参数,这个参数是定时更新zk中partition的信息,

[java] view plain copy

//ZkBrokerReader

@Override

public GlobalPartitionInformation getCurrentBrokers() {

long currTime = System.currentTimeMillis();

if (currTime > lastRefreshTimeMs + refreshMillis) { // 当前时间大于和上次更新时间之差大于refreshMillis

try {

LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");

cachedBrokers = reader.getBrokerInfo();

lastRefreshTimeMs = currTime;

} catch (java.net.SocketTimeoutException e) {

LOG.warn("Failed to update brokers", e);

}

}

return cachedBrokers;

}

// 下面是调用DynamicBrokersReader 的代码

/**

* Get all partitions with their current leaders

*/

public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException {

GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();

try {

int numPartitionsForTopic = getNumPartitions();

String brokerInfoPath = brokerPath();

for (int partition = 0; partition < numPartitionsForTopic; partition++) {

int leader = getLeaderFor(partition);

String path = brokerInfoPath + "/" + leader;

try {

byte[] brokerData = _curator.getData().forPath(path);

Broker hp = getBrokerHost(brokerData);

globalPartitionInformation.addPartition(partition, hp);

} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {

LOG.error("Node {} does not exist ", path);

}

}

} catch (SocketTimeoutException e) {

throw e;

} catch (Exception e) {

throw new RuntimeException(e);

}

LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);

return globalPartitionInformation;

}

GlobalPartitionInformation是一个Iterator类,存放了paritition与broker之间的对应关系,DynamicPartitionConnections中维护Kafka Consumer与parittion之间的关系,每个Consumer读取哪些paritition信息。这个COnnectionInfo信息会在storm.kafka.ZkCoordinator中会被初始化和更新,需要提到的一点是一个KafkaSpout包含一个SimpleConsumer

[java] view plain copy

//storm.kafka.DynamicPartitionConnections

static class ConnectionInfo {

SimpleConsumer consumer;

Set<Integer> partitions = new HashSet();

public ConnectionInfo(SimpleConsumer consumer) {

this.consumer = consumer;

}

}

再看ZkCoordinator类,看其构造函数

[java] view plain copy

//storm.kafka.ZkCoordinator

public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {

_spoutConfig = spoutConfig;

_connections = connections;

_taskIndex = taskIndex;

_totalTasks = totalTasks;

_topologyInstanceId = topologyInstanceId;

_stormConf = stormConf;

_state = state;

ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;

_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;

_reader = reader;

}

_refreshFreqMs就是定时更新zk partition到本地的操作,在kafkaSpout中nextTuple方法中每次都会去调用ZkCoordinator的getMyManagedPartitions方法。该方法根据_refreshFreqMs参数定时更新partition信息

[java] view plain copy

//storm.kafka.ZkCoordinator

@Override

public List<PartitionManager> getMyManagedPartitions() {

if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {

refresh();

_lastRefreshTime = System.currentTimeMillis();

}

return _cachedList;

}

@Override

public void refresh() {

try {

LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");

GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();

List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);

Set<Partition> curr = _managers.keySet();

Set<Partition> newPartitions = new HashSet<Partition>(mine);

newPartitions.removeAll(curr);

Set<Partition> deletedPartitions = new HashSet<Partition>(curr);

deletedPartitions.removeAll(mine);

LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());

for (Partition id : deletedPartitions) {

PartitionManager man = _managers.remove(id);

man.close();

}

LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());

for (Partition id : newPartitions) {

PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);

_managers.put(id, man);

}

} catch (Exception e) {

throw new RuntimeException(e);

}

_cachedList = new ArrayList<PartitionManager>(_managers.values());

LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");

}

其中每个Consumer分配partition的算法是KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);

主要做的工作就是获取并行的task数,与当前partition做比较,得出一个COnsumer要负责哪些parititons的读取,具体算法去kafka文档吧

以上在KafkaSpout中做完了初始化操作,下面开始取数据发射数据了,来看nextTuple方法

[java] view plain copy

// storm.kafka.KafkaSpout

@Override

public void nextTuple() {

List<PartitionManager> managers = _coordinator.getMyManagedPartitions();

for (int i = 0; i < managers.size(); i++) {

try {

// in case the number of managers decreased

_currPartitionIndex = _currPartitionIndex % managers.size();

EmitState state = managers.get(_currPartitionIndex).next(_collector);

if (state != EmitState.EMITTED_MORE_LEFT) {

_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();

}

if (state != EmitState.NO_EMITTED) {

break;

}

} catch (FailedFetchException e) {

LOG.warn("Fetch failed", e);

_coordinator.refresh();

}

}

long now = System.currentTimeMillis();

if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {

commit();

}

}

看完上述代码可知,所有的操作都是在PartitionManager中进行的,PartitionManager中会读取message信息,然后进行发射,主要逻辑在PartitionManager的next方法中

[java] view plain copy

//returns false if it's reached the end of current batch

public EmitState next(SpoutOutputCollector collector) {

if (_waitingToEmit.isEmpty()) {

fill();

}

while (true) {

MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();

if (toEmit == null) {

return EmitState.NO_EMITTED;

}

Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);

if (tups != null) {

for (List<Object> tup : tups) {

collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

}

break;

} else {

ack(toEmit.offset);

}

}

if (!_waitingToEmit.isEmpty()) {

return EmitState.EMITTED_MORE_LEFT;

} else {

return EmitState.EMITTED_END;

}

}

如果_waitingToEmit列表为空,则去读取msg,然后进行逐条发射,每发射一条,break一下,返回EMIT_MORE_LEFT给KafkaSpout的nextTuple方法中,,然后进行判断是否该paritition读取的一次读取的message buffer size是否已发射完毕,如果发射完毕就进行下一个partition 数据读取和发射,

注意的一点是,并不是一次把该partition的所有待发射的msg都发射完再commit offset到zk,而是发射一条,判断一下是否到了该commit的时候了(开始时设置的定时commit时间间隔),笔者认为这样做的原因是为了好控制fail

KafkaSpout中的ack,fail,commit操作全部交给了PartitionManager来做,看代码

[java] view plain copy

@Override

public void ack(Object msgId) {

KafkaMessageId id = (KafkaMessageId) msgId;

PartitionManager m = _coordinator.getManager(id.partition);

if (m != null) {

m.ack(id.offset);

}

}

@Override

public void fail(Object msgId) {

KafkaMessageId id = (KafkaMessageId) msgId;

PartitionManager m = _coordinator.getManager(id.partition);

if (m != null) {

m.fail(id.offset);

}

}

@Override

public void deactivate() {

commit();

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(_spoutConfig.scheme.getOutputFields());

}

private void commit() {

_lastUpdateMs = System.currentTimeMillis();

for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {

manager.commit();

}

}

所以PartitionManager是KafkaSpout的核心,很晚了,都3点多了,后续会不上PartitionManager的分析,晚安

版权声明:本文为博主原创文章,未经博主允许不得转载。

本文已收录于以下专栏:

Storm-kafka源码浅谈
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: