Storm-Kafka源代码解析
2016-01-11 15:21
281 查看
Storm-Kafka源代码解析
说明:本文所有代码基于Storm 0.10版本,本文描述内容只涉及KafkaSpout和KafkaBolt相关,不包含trident特性。Kafka Spout
KafkaSpout的构造函数如下:public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf; }
其构造参数来自于SpoutConfig对象,Spout中用到的所有参数都来自于该对象。该对象参数说明如下:
SpoutConfig
SpoutConfig继承自KafkaConfig。两个类内部所有参数及说明如下:/** * Kafka地址和分区关系对应信息 * 在kafka的分区信息和地址信息都很清楚的情况下,可以以直接使用StaticHosts * 但是该对象参数很难构建,需要的信息很多,所以我们一般情况下并不使用它。 * 我们主要用的是ZKHosts的实例。可以在其中设置Zookeeper地址等信息,然后动态获取kafka元数据 * ZKHost的参数信息见下面一段。 * 必选参数 **/ public final BrokerHosts hosts; /** * 要从kafka中读取的topic队列名称 * 必选参数 **/ public final String topic; /** * Kafka的客户端id参数,该参数一般不需要设置 * 默认值为kafka.api.OffsetRequest.DefaultClientId() * 空字符串 **/ public final String clientId; /** * Kafka Consumer每次请求获取的数据量大小 * 每次获取的数据消费完毕之后,才会再获取数据 * 默认1MB **/ public int fetchSizeBytes = 1024 * 1024; /** * Kafka SimpleConsumer 客户端和服务端连接的超时时间 * 单位:毫秒 **/ public int socketTimeoutMs = 10000; /** * Consumer每次获取数据的超时时间 * 单位:毫秒 **/ public int fetchMaxWait = 10000; /** * Consumer通过网络IO获取数据的socket buffet大小, * 默认1MB **/ public int bufferSizeBytes = 1024 * 1024; /** * 该参数有两个作用: * 1:申明输出的数据字段 declareoutputFileds * 2:对从kafka中读到的数据进行反序列化,即将byte字节数组转为tuple对象。 * 对kafka存入数据的key和message都比较关心的,可以使用KeyValueSchemeAsMultiScheme, * 如果不关心,可以使用SchemeAsMultiScheme * 默认接口实现一般都只会输出一个字段或者两个字段,很多时候,我们需要直接从kafka中读取到数据之后,就将每个字段解析了,然后进行简单处理再emit * 这个时候,建议自己实现MultiScheme接口 * 必选参数 **/ public MultiScheme scheme = new RawMultiScheme(); /** * 在拓扑提交之后,KafkaSpout会从zookeeper中读取以前的offset值,以便沿着上次位置继续读取数据。 * KafkaSpout会检查拓扑ID和zookeeper中保存的拓扑id是否相同。 * 如果不同,并且ignoreZkOffsets=true,那么就会从startOffsetTime参数位置读取数据 * 否则,沿着zookeeper中保存的offset位置继续读取数据。 * 也就是说,当ignoreZkOffsets=true的时候,kafkaspout只能保证在拓扑不杀掉的情况下,当worker进程异常退出的时候,会沿着上次读取位置继续读取数据,当拓扑重新提交的时候,就会从队列最早位置开始读取数据。 * 这样就会存在重复读取数据的问题,所以正式场景,该参数还是应该设置为false。以保证任何场景数据的只被读取一次。 **/ public boolean ignoreZkOffsets = false; /** * 拓扑第一次提交,zookeeper中没有保存对应offset的情况下,默认从kafka中读取的offset位置。默认从队列最早位置开始读取数据,即从队列最开始位置读取数据。 **/ public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); /** * * 如果当前的(offset值-failed offsets中最小值) < maxOffsetBehind * 那么就会清理failed列表中所有大于maxOffsetBehind的offset值。 * 这是为了防止failed过多,重发太多导致内存溢出 * 不过默认为了保证数据不丢失,所以maxOffsetBehind设置的最大 **/ public long maxOffsetBehind = Long.MAX_VALUE; /** * 当KafkaSpout初始化之后,使用从zookeeper中读取的上次记录的offset * 从kafka中获取数据失败,返回offsetOutofRange错误之后, * 是否使用startOffset从队列最早位置重新获取数据。 * offsetOutofrange一般发生在topic被重建,分片被删除的场景。 **/ public boolean useStartOffsetTimeIfOffsetOutOfRange = true; /** * metric监控信息采集间隔 **/ public int metricsTimeBucketSizeInSecs = 60; /** * KafkaSpout保存offset的zookeeper所在地址 * 独立出来这个属性是为了防止offset保存位置不在kafka集群中 * 如果kafka和storm在一个集群,该属性可以忽略 **/ public List<String> zkServers = null; /** * KafkaSpout保存offset的zookeeper端口 * 如果kafka和storm在一个集群,该属性可以忽略 **/ public Integer zkPort = null; /** * offset在zookeeper中保存的路径 * 路径计算方式为:${zkRoot}/${id}/${partitionId} * 必选参数 **/ public String zkRoot = null; /** * kafkaSpout保存offset的不同客户端区分标志 * 建议每个拓扑使用固定的,不同的参数,以保证拓扑重新提交之后,可以从上次位置继续读取数据 * 如果两个拓扑公用同一个id,那么可能会被重复读取 * 如果在拓扑中使用了动态生成的uuid来作为id,那么每次提交的拓扑,都会从队列最开始位置读取数据 * 必选参数 **/ public String id = null; /** * offset刷新到zookeeper中的时间间隔 * 单位:毫秒 **/ public long stateUpdateIntervalMs = 2000; /** * 数据发送失败之后重试策略相关参数 **/ public long retryInitialDelayMs = 0; /** * 数据发送失败之后重试策略相关参数 **/ public double retryDelayMultiplier = 1.0; /** * 数据发送失败之后重试策略相关参数 **/ public long retryDelayMaxMs = 60 * 1000;
ZKHost中保存了kafka集群所在的zookeeper地址等信息
ZKHost
/** * kafka集群zookeeper地址,允许包含chroot * 比如:192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181/kafka **/ public String brokerZkStr = null; /** * kafka集群中broker元数据所在地址 * 默认为/brokers * 如果配置了chroot,那么就是/kafka/brokers * 这个和kakfa服务端配置默认是一样的,如果服务端采用默认配置,该属性也可以使用默认值 **/ public String brokerZkPath = null; // e.g., /kafka/brokers /** * kafka broker分区信息刷新时间间隔, * 单位:秒 * 当kafka有broker节点重启或者分区信息发生变化而导致数据读取失败的时候, * 都会重新触发一次分区信息刷新 **/ public int refreshFreqSecs = 60;
KafkaSpout初始化
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; Map stateConf = new HashMap(conf); /* * offset保存位置的zookeeper地址 * 如果该地址为空,则默认使用Storm集群的zookeeper */ List<String> zkServers = _spoutConfig.zkServers; if (zkServers == null) { zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); } Integer zkPort = _spoutConfig.zkPort; if (zkPort == null) { zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); } stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); //保存offset信息到zookeeper _state = new ZkState(stateConf); //kafka集群的连接器 _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } else { //从zookeeper中读取kafka的broker信息,只保存自身实例需要用到的分区信息 _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } //两个metrics监控信息,忽略 context.registerMetric("kafkaOffset", new IMetric() { ...}, _spoutConfig.metricsTimeBucketSizeInSecs); context.registerMetric("kafkaPartition", new IMetric() {...}, _spoutConfig.metricsTimeBucketSizeInSecs); }
以上是kafkaSpout的初始化方法,主要是完成对自身管理分区信息的刷新。
这里有一个问题,就是会创建3个zookeeper客户端连接,一个用来从kafka中读取数据,一个保存offset,一个是metrics监控信息,每个zookeeper客户端连接会创建3个线程,这样,光一个kafkaSpout就会存在9个zookeeper线程!当worker进程中有多个spout实例的时候,就会产生更多的线程,这就会很消耗性能,这个还是建议对zookeeper连接进行合并处理。
系统通过KafkaUtils.calculatePartitionsForTask方法来获取自己需要管理的分区列表:
for (int i = taskIndex; i < numPartitions; i += totalTasks) { Partition taskPartition = partitions.get(i); taskPartitions.add(taskPartition); }
其中,taskIndex就对应自身spout实例的序号,比如该spout并发度为3,那么这个spout实例就可能为0,1,2。当kafka的topic有5个分区的时候,第一个spout实例管理0,3的分区;第二个spout实例管理编号为1,4的分区,第三个spout实例管理编号为2的分区。
taskId保存在Spout的Open方法的context参数中。context.getThisTaskIndex()
KafkaSpout从Kafka中如何读取数据并发送
kafkaSpout主要在nextTuple方法中读取数据并emit。
public void nextTuple() { //获取自身实例管理的分区列表 List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { try { //_currPartitionIndex永远小于manager的大小 // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); //获取数据并emit EmitState state = managers.get(_currPartitionIndex).next(_collector); /* * 检查此次数据发送状态 * 如果没有取到数据或者取到的数据都已经emit完毕 * 那么就增加_currPartitionIndex值,然后就可以从下个分区中读取数据了。 */ if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } /* * 如果还有数据没有emit,就退出此次循环,等待下次nexttuple调用 * 然后仍然从当前分区中取获取数据并emit */ if (state != EmitState.NO_EMITTED) { break; } } catch (FailedFetchException e) { LOG.warn("Fetch failed", e); _coordinator.refresh(); } } //定期保存offset数据到zookeeper long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } }
数据发送状态EmitState一共有三种状态
EMITTED_MORE_LEFT
上次取到的数据还没有emit完毕
EMITTED_END,
上次取到的数据已经全部emit完毕
NO_EMITTED
本次没有取到数据,没有可供emit的数据
再来看下PartitionManager.next方法,里面就包含如何获取数据已经如何emit
public EmitState next(SpoutOutputCollector collector) { //如果等待发送的队列为空,那么就从kafka中再取一次数据 if (_waitingToEmit.isEmpty()) { fill(); } while (true) { //从等待发送的队列中获取第一个数据 MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //如果没有可供发送的数据,那么返回emit状态为没有可以emit的数据 if (toEmit == null) { return EmitState.NO_EMITTED; } //根据KeyValueSchemeAsMultiScheme接口实现,将kafka中取到的数据转为tuple Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { //发送所有的tuple,因为kafka一条数据可能对应storm的多条 for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { //如果tuple转化失败,返回null,直接告诉storm该条已经处理成功,即忽略数据错误 ack(toEmit.offset); } } /* * 每次从等待队列中取一条数据反序列化并emit, * 然后判断等待队列是否还有数据, * 如果还有数据,就告诉spout,数据还没有发送完,不要切换分区 * 如果数据已经发送完毕,就告诉spout,数据已经发送完毕,可以切换到下个分区了。 */ if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }
当有数据发送失败的时候,失败的数据又会重新加入到_waitingToEmit队列中,这样就会产生一个问题,就是当数据发送失败的时候,kakfaSpout会永远只读一个分区,前天分区都不会读取,从而产生数据消费不均匀的问题。
在0.9.6以前老版本的时候哟一个问题,就是当较多数据emit失败的时候,会有很多的数据在不断重试,然后重试不断超时,又不断重新加入重试列表,从而导致一个数据发送的死循环。这个问题也就是offset超时的问题。见Storm-643, 这个问题目前在最新版本中已经解决。
KafkaBolt
KafkaBolt就比较简单,0.10版本还是使用old Producer API。Storm所有的配置属性,都在kafka.broker.properties中保存着,这就要求在submitTopology的时候,在topologyConf中再put一个kafka.broker.properties属性,形成一个map中套map的结构。这样有一点不好的就是一个拓扑中数据只能写到一个kafka集群中,不支持同事写到多个kafka集群中。不过这个在0.11新版本中已经解决了,kafka.broker.properties被作为了一个局部变量,可以在不同的bolt实例中保存不同的配置属性。
数据写入方法如下:
public void execute(Tuple input) { if (TupleUtils.isTick(input)) { collector.ack(input); return; // Do not try to send ticks to Kafka } K key = null; V message = null; String topic = null; try { //消息的键值,不同的值在kafka中对应不同的分发方式,这个在KafkaBolt的FAQ中有介绍。 key = mapper.getKeyFromTuple(input); //消息体 message = mapper.getMessageFromTuple(input); //topic名称 topic = topicSelector.getTopic(input); if(topic != null ) { producer.send(new KeyedMessage<K, V>(topic, key, message)); } else { LOG.warn("skipping key = " + key + ", topic selector returned null."); } collector.ack(input); } catch (Exception ex) { collector.reportError(ex); collector.fail(input); } }
Storm-Kafka FAQ
KafkaSpout
KafkaSpout excutor数量和Kafka topic分区数量的关系当executor并发度大于topic数量的时候,就会存在有的spout实例可以读到数据, 有的spout实例读不到数据。
当executor并发度小于topic数量的时候,就会存在一个spout实例对应多个分区的情况;kafka会先从一个分区中取一次数据,当这次获取的数据emit完毕之后,就会再从下个分区中取数据。
当executor并发度等于topic数量的时候,一个spout实例对应一个分区。在实际应用中,我们也推荐这种配置方式。
如何从kafka中读取数据,每次读取多少数据
根据fetchSizeBytes参数的配置,默认每次取1MB数据。
数据读取失败如何处理
KafkaSpout每个PartitionManager内部保存一个重试队列,当数据发送失败的时候,加入重试队列,然后重新发送,直到成功为止。
通过maxOffsetBehind参数来解决failed数量过多导致内存溢出问题。
Topic不存在如何处理
直接报错。
拓扑重新提交,会不会接着上次位置继续读取数据
重新提交的时候,只要id这个参数不变,那么就会沿着上次位置继续读取数据。
zookeeper中保存的kafka的offset位置有错误怎么办?
会抛出offsetOutofRange异常,然后默认从kafka分区队列最早位置开始读取数据。
能不能在一个spout中从多个topic读取数据?
在0.10版本不行,在0.11版本中,支持按照正则方式匹配topic名称,可以从所有满足正则条件的topic中读取数据。
topic分区主备信息发生变化,如何处理
抛出异常,然后马上更新分区信息,再次读取数据。
KafkaBolt
写入数据,kafka topic不存在怎么办?如果kakfa服务端允许自动创建topic,那么就会自动创建topic。
如果不允许自动创建,那么就会抛出异常
如何写数据到指定分区?
取决于tupleToKafkaMapper的接口实现。
kafka 0.10版本使用的是old producer的API,0.11版本使用的是new Producer的API
对于old Producer
如果key == null,那么在kafka中,会随机寸照一个分区去写入数据,之后只要不重启,就都会往这个分区写入数据
如果key != null,那么就会在写入数据的时候,以utils.abs(key.hashCode)%numPartitions规则计算分区id
对于New Producer
如果key = null,那么就会使用一个递增的int值,每次发送数据的时候递增,然后执行utils.abs(nextValue)%availablePartitions.size(),数据写入会比较均衡。
如果key != null,那么就会按照Utils.abs(Utils.murmur2(record.key()))%numPartitions的规则计算分区。
当然,New Producer API也可以手工指定分区id。
相关文章推荐
- 用 Github + Jekyll 写博客
- 在mac系统安装go语言以及配置
- 5.7. More on Conditions(有关控制的更多内容)
- YTU 2912: 圆柱体的C++
- Python+Django+SAE系列教程14-----使表单更安全
- 纯代码通过autolayout进行约束
- 安装python,pip,pymongo
- Java正则表达式教程及示例
- 可选操作 java.lang.UnsupportedOperationException Collection
- 来自西弗吉利亚大学li xin整理的CV代码合集
- 较完整的串口类(WINAPI/C++/源码),解决10以上端口,合理结束线程等问题
- php处理金额显示的一些笔记
- 【第七章】 对JDBC的支持 之 7.4 Spring提供的其它帮助 ——跟我学spring3【私塾在线原创】
- 编程学习笔记之c++相关::dynamic_cast介绍
- 重拾编程之路--直接选择排序算法
- Python缩进快捷键
- 浅谈Java序列化
- 辛星PHP教程之yii和ci教程已经写完,望与朋友们交流
- 【Spring3】(1)初识Spring
- day11 python学习随笔