Flume中同时使用Kafka Source和Kafka Sink的Topic覆盖问题
2016-11-17 09:04
495 查看
转载:lxw的大数据田地 » Flume中同时使用Kafka
Source和Kafka Sink的Topic覆盖问题
如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。
比如:在Agent中的Kafka Source配置Topic为:
agent_myAgent.sources.kafkaSource.topic = sourceTopic
在Kafka Sink配置Topic为:
agent_myAgent.sinks.kafkaSink.topic = sinkTopic
你会发现,最后数据又被写入到sourceTopic中,而sinkTopic没有任何数据写入。
经过DEBUG和分析,原因如下:
配置项官网文档说明如下:
属性名topic,默认值为default-flume-topic。
The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here。
如果event header中包含了key为”topic”的值,那么将会覆盖该属性配置。
在源码org.apache.flume.sink.kafka.KafkaSink.process()中,
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
其中topic是从属性agent_myAgent.sinks.kafkaSink.topic = sinkTopic 中获取的属性值(如果没有配置,则使用默认topic名称)
topic = context.getString(KafkaSinkConstants.TOPIC,KafkaSinkConstants.DEFAULT_TOPIC);
即:先使用event header中key为”topic”的值作为sink的topic,如果event header中没有,才取属性中配置的topic。
源码:org.apache.flume.source.kafka.KafkaSource.process()
// Add headers to event (topic, timestamp, and key)
headers = new HashMap<String, String>();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
在event中,将Kafka Source中配置的topic加入到了header中。
因此,在Kafka Sink中,首先从event header中读取到了topic,Sink端的配置项不起作用。
使用Flume拦截器,修改event header中key=topic的值为目标topic,拦截器使用Static interceptor,配置如下:
## Source 拦截器
agent_myAgent.sources.kafkaSource.interceptors = i1
agent_myAgent.sources.kafkaSource.interceptors.i1.type = static
agent_myAgent.sources.kafkaSource.interceptors.i1.key = topic
agent_myAgent.sources.kafkaSource.interceptors.i1.preserveExisting = false
agent_myAgent.sources.kafkaSource.interceptors.i1.value = sinkTopic
其中,要特别注意preserveExisting属性值需要设置为false,该属性设置如果event header中已经存在key=topic,是否保留原来的值,默认为true,如果为true,那么还是会保留原来的topic,你设置的将不会生效。
另外,Kafka Sink中可以不用再设置topic属性了,反正也没用。
agent_myAgent.sinks.kafkaSink.topic = sinkTopic //不需要了
Source和Kafka Sink的Topic覆盖问题
如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。
比如:在Agent中的Kafka Source配置Topic为:
agent_myAgent.sources.kafkaSource.topic = sourceTopic
在Kafka Sink配置Topic为:
agent_myAgent.sinks.kafkaSink.topic = sinkTopic
你会发现,最后数据又被写入到sourceTopic中,而sinkTopic没有任何数据写入。
经过DEBUG和分析,原因如下:
在Kafka Sink中
配置项官网文档说明如下:属性名topic,默认值为default-flume-topic。
The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here。
如果event header中包含了key为”topic”的值,那么将会覆盖该属性配置。
在源码org.apache.flume.sink.kafka.KafkaSink.process()中,
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
其中topic是从属性agent_myAgent.sinks.kafkaSink.topic = sinkTopic 中获取的属性值(如果没有配置,则使用默认topic名称)
topic = context.getString(KafkaSinkConstants.TOPIC,KafkaSinkConstants.DEFAULT_TOPIC);
即:先使用event header中key为”topic”的值作为sink的topic,如果event header中没有,才取属性中配置的topic。
在Kafka Source中
源码:org.apache.flume.source.kafka.KafkaSource.process()// Add headers to event (topic, timestamp, and key)
headers = new HashMap<String, String>();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
在event中,将Kafka Source中配置的topic加入到了header中。
因此,在Kafka Sink中,首先从event header中读取到了topic,Sink端的配置项不起作用。
解决办法
使用Flume拦截器,修改event header中key=topic的值为目标topic,拦截器使用Static interceptor,配置如下:## Source 拦截器
agent_myAgent.sources.kafkaSource.interceptors = i1
agent_myAgent.sources.kafkaSource.interceptors.i1.type = static
agent_myAgent.sources.kafkaSource.interceptors.i1.key = topic
agent_myAgent.sources.kafkaSource.interceptors.i1.preserveExisting = false
agent_myAgent.sources.kafkaSource.interceptors.i1.value = sinkTopic
其中,要特别注意preserveExisting属性值需要设置为false,该属性设置如果event header中已经存在key=topic,是否保留原来的值,默认为true,如果为true,那么还是会保留原来的topic,你设置的将不会生效。
另外,Kafka Sink中可以不用再设置topic属性了,反正也没用。
agent_myAgent.sinks.kafkaSink.topic = sinkTopic //不需要了
相关文章推荐
- 记录Flume使用KafkaSource的时候Channel队列满了之后发生的怪异问题
- Flume使用Kafka Sink导致CPU过高的问题
- flume配置-生产环境下 Taildir Source to kafka Sink
- Flume使用大全之kafka source-kafka channel-hdfs(kerberos认证,SSL加密)
- 【less.js扫雷系列一】--less和CSS同时时使用时,可能出现的样式优先级变化导致样式覆盖问题
- Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装
- [Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)
- 解决 flume KafkaSink 启动后cpu占用100%的问题
- [日志处理工作之三]使用flume采集DB2日志推送到kafka,并使用spark streaming拉取指定topic的日志
- 使用flume将kafka数据sink到HBase【转】
- Flume中使用KafkaChannel时topic中存在异常奇怪字符
- Flume使用大全之kafka source-kafka channel-hdfs(SSL加密)
- Flume使用大全之kafka source-kafka channel-hdfs(kerberos认证)
- [bigdata] 使用Flume hdfs sink, hdfs文件未关闭的问题
- 【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
- 同时使用afxsock.h及winsock2.h的方法 error C2011: 'fd_set' : 'struct' type redefinition的问题
- 使用bindingsource数据绑定时datagridview单元格无法及时刷新的问题
- iPhone上如何同时播放多个AVAudioPlayer,要求不能产生音频的覆盖问题
- 求助: asp.net MVC3中同时使用dropdownlist 和 checkbox时遇到的问题
- printf与cout同时使用的问题