您的位置:首页 > 其它

Flume event转化为ES的document过程分析

2016-05-18 22:18 357 查看
使用更改后的MultiExecSource + MemoryChannel + ElasticsearchSink 将flume收集到的日志信息发送到es集群中。

一条flume event的构成

一个flume event由 header + body 构成:

/**
* Basic representation of a data object in Flume.
* Provides access to data as it flows through the system.
*/
public interface Event {

/**
* Returns a map of name-value pairs describing the data stored in the body.
*/
public Map<String, String> getHeaders();

/**
* Set the event headers
* @param headers Map of headers to replace the current headers.
*/
public void setHeaders(Map<String, String> headers);

/**
* Returns the raw byte array of the data contained in this event.
*/
public byte[] getBody();

/**
* Sets the raw byte array of the data contained in this event.
* @param body The data.
*/
public void setBody(byte[] body);

}


如果使用Source使用ExecSource读取文件,那么默认body就是每一行的内容,headers可以通过interceptor来添加。

一条es document的构成

比如,使用ElasticsearchSink,往es里导入数据,es中的索引信息如下:

[root@10 /data/home/ambari]#   curl -XGET 'http://localhost:9200/log-2016-04-20/_mapping/log?pretty'
{
"log-2016-04-20" : {
"mappings" : {
"log" : {
"properties" : {
"@message" : {
"type" : "string"
},
"@timestamp" : {
"type" : "date",
"format" : "dateOptionalTime"
},
"component" : {
"type" : "string"
},
"content" : {
"type" : "string"
},
"fileName" : {
"type" : "string"
},
"hostname" : {
"type" : "string"
},
"level" : {
"type" : "string"
},
"offset" : {
"type" : "long"
},
"service" : {
"type" : "string"
},
"timestamps" : {
"type" : "string"
}
}
}
}
}
}


这里面,
@message
是flume event的
body
,其余的field是flume event的
headers
中的每一个header构建而来。

转化过程

构建过程在
ElasticSearchLogStashEventSerializer.java
中可以找到:

@Override
public XContentBuilder getContentBuilder(Event event) throws IOException {
XContentBuilder builder = jsonBuilder().startObject();
appendBody(builder, event);
appendHeaders(builder, event);
return builder;
}


可以看到,
appendBody
是把event的body加到es里的document中,
appendHeaders
是把event的headers加到es里的document中,默认一个flume event对应一个es document。

source的配置如下(有6个interceptor,其中的一个interceptor又产生了3个header):

# source1:hdfs_datanode_src ----------
agent.sources.hdfs_datanode_src.type = com.pku.flume.MultiLineExecSource
agent.sources.hdfs_datanode_src.command = tail -F /var/log/hdfs/hdfs/hadoop-hdfs-datanode-192.168.37.3.log
agent.sources.hdfs_datanode_src.channels = memoryChannel
agent.sources.hdfs_datanode_src.restart = true
# interceptor
agent.sources.hdfs_datanode_src.interceptors = i1 i2 i3 i4 i5 i6
agent.sources.hdfs_datanode_src.interceptors.i1.type = host
agent.sources.hdfs_datanode_src.interceptors.i1.useIP = true
agent.sources.hdfs_datanode_src.interceptors.i1.hostHeader = hostname
agent.sources.hdfs_datanode_src.interceptors.i1.preserveExisting = false
agent.sources.hdfs_datanode_src.interceptors.i2.type = static
agent.sources.hdfs_datanode_src.interceptors.i2.key = service
agent.sources.hdfs_datanode_src.interceptors.i2.value = hdfs
agent.sources.hdfs_datanode_src.interceptors.i3.type = static
agent.sources.hdfs_datanode_src.interceptors.i3.key = component
agent.sources.hdfs_datanode_src.interceptors.i3.value = datanode
agent.sources.hdfs_datanode_src.interceptors.i4.type = timestamp

agent.sources.hdfs_datanode_src.interceptors.i5.type = regex_extractor
agent.sources.hdfs_datanode_src.interceptors.i5.regex = ^(?:\\n)?(.+)\\s(INFO|ERROR|WARN|DEBUG|FATAL)\\s(.+)$
agent.sources.hdfs_datanode_src.interceptors.i5.serializers = s1 s2 s3
agent.sources.hdfs_datanode_src.interceptors.i5.serializers.s1.name = timestamps
agent.sources.hdfs_datanode_src.interceptors.i5.serializers.s2.name = level
agent.sources.hdfs_datanode_src.interceptors.i5.serializers.s3.name = content

agent.sources.hdfs_datanode_src.interceptors.i6.type = static
agent.sources.hdfs_datanode_src.interceptors.i6.key = fileName
agent.sources.hdfs_datanode_src.interceptors.i6.value = hadoop-hdfs-datanode-192.168.37.3.log


一个flume event转化为es的一个document的对应关系如下:

flume body:

body -> @message


flume headers:

hostname(interceptor加入的) -> hostname(类型string)
service(interceptor加入的) -> service(类型string)
component(interceptor加入的) -> component(类型string)
timestamp(interceptor加入的) -> @timestamp(类型date)
timestamps(interceptor加入的) -> timestamps(类型string)
level(interceptor加入的) -> level(类型string)
content(interceptor加入的) -> content(类型string)
fileName(interceptor加入的) -> fileName(类型string)
offset(在MultiLineExecSource的run过程加入的) -> offset(类型long)


遇到的错误

实际测试过程中遇到过这样一类错误:

19 Apr 2016 13:33:00,734 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to commit transaction. Transaction rolled back.
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:227)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.common.jackson.core.JsonParseException: Unrecognized token 'Topic': was expecting ('true', 'false' or 'null')
at [Source: [B@6e1faec2; line: 1, column: 7]
at org.elasticsearch.common.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at org.elasticsearch.common.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)
at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)
at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
at org.elasticsearch.common.xcontent.json.JsonXContentParser.nextToken(JsonXContentParser.java:51)
at org.apache.flume.sink.elasticsearch.ContentBuilderUtil.addComplexField(ContentBuilderUtil.java:60)
at org.apache.flume.sink.elasticsearch.ContentBuilderUtil.appendField(ContentBuilderUtil.java:47)
at org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.appendHeaders(ElasticSearchLogStashEventSerializer.java:162)
at org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:92)
at org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:76)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.addEvent(ElasticSearchTransportClient.java:166)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:189)
... 3 more


BUG出现的原因:sink出现没有被捕获,导致sink无法正常运行

BUG会导致的问题:这样会导致sink一直不能消费channel中的event,channel中的event因此一直积压发不出去,导致内存占用过高,一直做GC。

Flume社区有这个issue的解决办法:

https://issues.apache.org/jira/browse/FLUME-2769

需要修改flume1.6.0源码,目前看来是flume的一个bug,修复办法:

https://github.com/agentgt/flume/commit/09089257df239eeac942ef64b2d24c68efb5bec7

即出现异常时捕获该异常,强制结束该event的相应header转换为document的一个field的过程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: