Flume event转化为ES的document过程分析
2016-05-18 22:18
357 查看
使用更改后的MultiExecSource + MemoryChannel + ElasticsearchSink 将flume收集到的日志信息发送到es集群中。
如果使用Source使用ExecSource读取文件,那么默认body就是每一行的内容,headers可以通过interceptor来添加。
这里面,
可以看到,
source的配置如下(有6个interceptor,其中的一个interceptor又产生了3个header):
一个flume event转化为es的一个document的对应关系如下:
flume body:
flume headers:
BUG出现的原因:sink出现没有被捕获,导致sink无法正常运行
BUG会导致的问题:这样会导致sink一直不能消费channel中的event,channel中的event因此一直积压发不出去,导致内存占用过高,一直做GC。
Flume社区有这个issue的解决办法:
https://issues.apache.org/jira/browse/FLUME-2769
需要修改flume1.6.0源码,目前看来是flume的一个bug,修复办法:
https://github.com/agentgt/flume/commit/09089257df239eeac942ef64b2d24c68efb5bec7
即出现异常时捕获该异常,强制结束该event的相应header转换为document的一个field的过程。
一条flume event的构成
一个flume event由 header + body 构成:/** * Basic representation of a data object in Flume. * Provides access to data as it flows through the system. */ public interface Event { /** * Returns a map of name-value pairs describing the data stored in the body. */ public Map<String, String> getHeaders(); /** * Set the event headers * @param headers Map of headers to replace the current headers. */ public void setHeaders(Map<String, String> headers); /** * Returns the raw byte array of the data contained in this event. */ public byte[] getBody(); /** * Sets the raw byte array of the data contained in this event. * @param body The data. */ public void setBody(byte[] body); }
如果使用Source使用ExecSource读取文件,那么默认body就是每一行的内容,headers可以通过interceptor来添加。
一条es document的构成
比如,使用ElasticsearchSink,往es里导入数据,es中的索引信息如下:[root@10 /data/home/ambari]# curl -XGET 'http://localhost:9200/log-2016-04-20/_mapping/log?pretty' { "log-2016-04-20" : { "mappings" : { "log" : { "properties" : { "@message" : { "type" : "string" }, "@timestamp" : { "type" : "date", "format" : "dateOptionalTime" }, "component" : { "type" : "string" }, "content" : { "type" : "string" }, "fileName" : { "type" : "string" }, "hostname" : { "type" : "string" }, "level" : { "type" : "string" }, "offset" : { "type" : "long" }, "service" : { "type" : "string" }, "timestamps" : { "type" : "string" } } } } } }
这里面,
@message是flume event的
body,其余的field是flume event的
headers中的每一个header构建而来。
转化过程
构建过程在ElasticSearchLogStashEventSerializer.java中可以找到:
@Override public XContentBuilder getContentBuilder(Event event) throws IOException { XContentBuilder builder = jsonBuilder().startObject(); appendBody(builder, event); appendHeaders(builder, event); return builder; }
可以看到,
appendBody是把event的body加到es里的document中,
appendHeaders是把event的headers加到es里的document中,默认一个flume event对应一个es document。
source的配置如下(有6个interceptor,其中的一个interceptor又产生了3个header):
# source1:hdfs_datanode_src ---------- agent.sources.hdfs_datanode_src.type = com.pku.flume.MultiLineExecSource agent.sources.hdfs_datanode_src.command = tail -F /var/log/hdfs/hdfs/hadoop-hdfs-datanode-192.168.37.3.log agent.sources.hdfs_datanode_src.channels = memoryChannel agent.sources.hdfs_datanode_src.restart = true # interceptor agent.sources.hdfs_datanode_src.interceptors = i1 i2 i3 i4 i5 i6 agent.sources.hdfs_datanode_src.interceptors.i1.type = host agent.sources.hdfs_datanode_src.interceptors.i1.useIP = true agent.sources.hdfs_datanode_src.interceptors.i1.hostHeader = hostname agent.sources.hdfs_datanode_src.interceptors.i1.preserveExisting = false agent.sources.hdfs_datanode_src.interceptors.i2.type = static agent.sources.hdfs_datanode_src.interceptors.i2.key = service agent.sources.hdfs_datanode_src.interceptors.i2.value = hdfs agent.sources.hdfs_datanode_src.interceptors.i3.type = static agent.sources.hdfs_datanode_src.interceptors.i3.key = component agent.sources.hdfs_datanode_src.interceptors.i3.value = datanode agent.sources.hdfs_datanode_src.interceptors.i4.type = timestamp agent.sources.hdfs_datanode_src.interceptors.i5.type = regex_extractor agent.sources.hdfs_datanode_src.interceptors.i5.regex = ^(?:\\n)?(.+)\\s(INFO|ERROR|WARN|DEBUG|FATAL)\\s(.+)$ agent.sources.hdfs_datanode_src.interceptors.i5.serializers = s1 s2 s3 agent.sources.hdfs_datanode_src.interceptors.i5.serializers.s1.name = timestamps agent.sources.hdfs_datanode_src.interceptors.i5.serializers.s2.name = level agent.sources.hdfs_datanode_src.interceptors.i5.serializers.s3.name = content agent.sources.hdfs_datanode_src.interceptors.i6.type = static agent.sources.hdfs_datanode_src.interceptors.i6.key = fileName agent.sources.hdfs_datanode_src.interceptors.i6.value = hadoop-hdfs-datanode-192.168.37.3.log
一个flume event转化为es的一个document的对应关系如下:
flume body:
body -> @message
flume headers:
hostname(interceptor加入的) -> hostname(类型string) service(interceptor加入的) -> service(类型string) component(interceptor加入的) -> component(类型string) timestamp(interceptor加入的) -> @timestamp(类型date) timestamps(interceptor加入的) -> timestamps(类型string) level(interceptor加入的) -> level(类型string) content(interceptor加入的) -> content(类型string) fileName(interceptor加入的) -> fileName(类型string) offset(在MultiLineExecSource的run过程加入的) -> offset(类型long)
遇到的错误
实际测试过程中遇到过这样一类错误:19 Apr 2016 13:33:00,734 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to commit transaction. Transaction rolled back. at org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:227) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.elasticsearch.common.jackson.core.JsonParseException: Unrecognized token 'Topic': was expecting ('true', 'false' or 'null') at [Source: [B@6e1faec2; line: 1, column: 7] at org.elasticsearch.common.jackson.core.JsonParser._constructError(JsonParser.java:1487) at org.elasticsearch.common.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323) at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482) at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at org.elasticsearch.common.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697) at org.elasticsearch.common.xcontent.json.JsonXContentParser.nextToken(JsonXContentParser.java:51) at org.apache.flume.sink.elasticsearch.ContentBuilderUtil.addComplexField(ContentBuilderUtil.java:60) at org.apache.flume.sink.elasticsearch.ContentBuilderUtil.appendField(ContentBuilderUtil.java:47) at org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.appendHeaders(ElasticSearchLogStashEventSerializer.java:162) at org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:92) at org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:76) at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.addEvent(ElasticSearchTransportClient.java:166) at org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:189) ... 3 more
BUG出现的原因:sink出现没有被捕获,导致sink无法正常运行
BUG会导致的问题:这样会导致sink一直不能消费channel中的event,channel中的event因此一直积压发不出去,导致内存占用过高,一直做GC。
Flume社区有这个issue的解决办法:
https://issues.apache.org/jira/browse/FLUME-2769
需要修改flume1.6.0源码,目前看来是flume的一个bug,修复办法:
https://github.com/agentgt/flume/commit/09089257df239eeac942ef64b2d24c68efb5bec7
即出现异常时捕获该异常,强制结束该event的相应header转换为document的一个field的过程。
相关文章推荐
- 网络流二十四题之十二 —— 软件补丁问题(BUG)
- 描述系统架构—部署图、组件图
- imfilter()函数的用法
- iOS 蓝牙开发(二)iOS 连接外设的代码实现
- min3d用法
- "围观"设计模式(13)--结构型之装饰模式(Decorator Pattern)
- 第2章 从HTML、XHTML到HTML5
- 数据结构实验之栈二:一般算术表达式转换成后缀式
- PSP(5.11——5.17)以及周记录
- 分享VMware题目解答
- iOS远程推送
- Java使用quartz实现作业调度
- centos安装
- java中的Clone(深拷贝,浅拷贝)
- 第11周补充(3)点派生时间类
- 镜花水月,过不留痕————铭记那些给我们带来进步的C语言小难题<2>
- Spark笔记:RDD基本操作(上)
- [数据库事务与锁]详解四: 数据库的锁机制
- 算法复习1:算法概述
- Java之Object类详解