package storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.PartitionManager.KafkaMessageId;

import java.util.*;

// TODO: need to add blacklisting
// TODO: need to make a best effort to not re-emit messages if don't have to
public class KafkaSpout extends BaseRichSpout {
public static class MessageAndRealOffset {
public Message msg;
public long offset;

public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;

static enum EmitState {

public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;

long _lastUpdateMs = 0;

int _currPartitionIndex = 0;

public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;

public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;

Map stateConf = new HashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);

_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);

context.registerMetric("kafkaOffset", new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);

public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Set<Partition> latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
return _kafkaOffsetMetric.getValueAndReset();
}, _spoutConfig.metricsTimeBucketSizeInSecs);

context.registerMetric("kafkaPartition", new IMetric() {
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
return concatMetricsDataMaps;
}, _spoutConfig.metricsTimeBucketSizeInSecs);

public void close() {

public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {

try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
if (state != EmitState.NO_EMITTED) {
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);

long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {

public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {

public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {

public void deactivate() {

public void declareOutputFields(OutputFieldsDeclarer declarer) {

private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {



package storm.kafka;

import java.io.Serializable;
import java.util.List;

public class SpoutConfig extends KafkaConfig implements Serializable {
public List<String> zkServers = null;//记录配置的消费的zk节点信息
public Integer zkPort = null;//zk的端口
public String zkRoot = null;//保存kafka消费信息的节点位置
public String id = null;//当前spout的消费组名称,如果多个spout名称一样,将共享消费进度,否则将不共享消费进度

// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000; //设置线程多久将消费进度保存到zk上

// Exponential back-off retry settings.  These are used when retrying messages after a bolt
// calls OutputCollector.fail().
public long retryInitialDelayMs = 0; //设置发送boult中初始化的时间,0为没限制
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;//发送到boult后超过多长时间为失败

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
this.zkRoot = zkRoot;
this.id = id;


package storm.kafka;

import backtype.storm.spout.MultiScheme;
import backtype.storm.spout.RawMultiScheme;

import java.io.Serializable;

public class KafkaConfig implements Serializable {

public final BrokerHosts hosts;//设置kafka从哪里获取相关的配置信息
public final String topic;//从哪个topic开始消费
public final String clientId;//设置客户端标识

public int fetchSizeBytes = 1024 * 1024;//发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
public int socketTimeoutMs = 10000;//设置的超时时间
public int fetchMaxWait = 10000;//设置的在broker无消息时的等待时间
public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
public MultiScheme scheme = new RawMultiScheme();//设置从服务器读取的byte[]流反序列化方式
public boolean ignoreZkOffsets = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从哪里的offset开始读取消息,默认从消息的最前端开始,有两种方式可选
public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次消息

public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());

public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
this.hosts = hosts;
this.topic = topic;
this.clientId = clientId;



package test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class TestMain {
public static void main(String[] args) throws Exception {
Config conf = new Config();
String zks = "";//设置zk的地址
String topic = "testflume";//消费的topic名称
String zkRoot = "/kafka"; // 消费信息保存在zk的地址
String id = "test";//客户端的ID
BrokerHosts brokerHosts = new ZkHosts(zks, "/brokers");//kafka和zk整合时设置的kafka根路径
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.ignoreZkOffsets = false;
//      spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();//从最新消息的开始读取
spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从最旧的消息开始读取
spoutConf.zkPort = 2181;//设置zk的端口
List<String> servers = new ArrayList<>();
spoutConf.zkServers = servers;//设置zk的地址
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("log-reader", new KafkaSpout(spoutConf));//设置spout的名称
builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("log-reader");//设置bolt的名称和分组的字段
if (args != null && args.length > 0) {
StormSubmitter.submitTopologyWithProgressBar("AGX", conf,
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("AGX_STORM", conf, builder.createTopology());

* 对kafka发来的数据进行第一次处理
* @author hasee
public static class Bolt1 implements IRichBolt {

OutputCollector _collector;

public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
_collector = collector;

public void execute(Tuple input) {

try {

String msg = input.getString(0);
System.out.println("开始消费消息:" + msg);
} catch (Exception e) {
} finally {


public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "click", "browse"));

public void cleanup() {
// TODO Auto-generated method stub


public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;


package storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import com.google.common.collect.ImmutableMap;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout.EmitState;
import storm.kafka.KafkaSpout.MessageAndRealOffset;
import storm.kafka.trident.MaxMetric;

import java.util.*;

public class PartitionManager {
public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);

private final CombinedMetric _fetchAPILatencyMax;
private final ReducedMetric _fetchAPILatencyMean;
private final CountMetric _fetchAPICallCount;
private final CountMetric _fetchAPIMessageCount;
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
private final FailedMsgRetryManager _failedMsgRetryManager;

// retryRecords key = Kafka offset, value = retry info for the given message
Long _committedTo;
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
Partition _partition;
SpoutConfig _spoutConfig;
String _topologyInstanceId;
SimpleConsumer _consumer;
DynamicPartitionConnections _connections;
ZkState _state;
Map _stormConf;
long numberFailed, numberAcked;
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition);
_state = state;
_stormConf = stormConf;
numberAcked = numberFailed = 0;

_failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,

String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();//计算出配置文件的位置
try {
Map<Object, Object> json = _state.readJSON(path);//取出配置文件
LOG.info("Read partition information from: " + path +  "  --> " + json );
if (json != null) {
jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
jsonOffset = (Long) json.get("offset");//获得上次读取的位置
} catch (Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);

Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);//取出kafka最新的offset
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = currentOffset;
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );

if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
LOG.info("Last commit offset from zookeeper: " + _committedTo);
Long lastCommittedOffset = _committedTo;
_committedTo = currentOffset;
LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);

LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
_emittedToOffset = _committedTo;

_fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
_fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();

public Map getMetricsDataMap() {
Map ret = new HashMap();
ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
return ret;
//returns false if it's reached the end of current batch
public EmitState next(SpoutOutputCollector collector) {
if (_waitingToEmit.isEmpty()) {
while (true) {
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
if (toEmit == null) {
return EmitState.NO_EMITTED;
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if ((tups != null) && tups.iterator().hasNext()) {
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
} else {
if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;

private void fill() {
long start = System.nanoTime();
Long offset;

// Are there failed tuples? If so, fetch those first.
offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
final boolean processingNewTuples = (offset == null);
if (processingNewTuples) {
offset = _emittedToOffset;

ByteBufferMessageSet msgs = null;
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
} catch (TopicOffsetOutOfRangeException e) {
_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics

//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset);

LOG.warn("Removing the failed offsets that are out of range: {}", omitted);

long end = System.nanoTime();
long millis = (end - start) / 1000000;
if (msgs != null) {
int numMessages = 0;

for (MessageAndOffset msg : msgs) {
final Long cur_offset = msg.offset();
if (cur_offset < offset) {
// Skip any old offsets.
if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
numMessages += 1;
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {

public void ack(Long offset) {
if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) {
// Too many things pending!
_pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();

public void fail(Long offset) {
if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
"Skipping failed tuple at offset=" + offset +
" because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
" behind _emittedToOffset=" + _emittedToOffset
} else {
LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
throw new RuntimeException("Too many tuple failures");


public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);

_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);

private String committedPath() {
return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();

public long lastCompletedOffset() {
if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.firstKey();

public Partition getPartition() {
return _partition;

public void close() {
_connections.unregister(_partition.host, _partition.partition);

static class KafkaMessageId {
public Partition partition;
public long offset;

public KafkaMessageId(Partition partition, long offset) {
this.partition = partition;
this.offset = offset;


