kafak、flume、elasticsearch
2016-03-07 19:15
387 查看
目标:利用Flume Agent实现,将Kafka中数据取出,送入ElasticSearch中。
分析:Flume Agent需要的工作,两点:
Flume Kafka Source:负责从Kafka中读取数据;
Flume ElasticSearch Sink:负责将数据送入ElasticSearch;
当前Flume 1.5.2已经包含了ElasticSearchSink,因此,需要定制实现Flume Kafka Source即可。当前从Jira上得知,Flume 1.6.0 中将包含Flume-ng-kafka-source,但是,当前Flume 1.6.0版本并没有发布,怎么办?两条路:
github上别人开源的Flume-ng-kafka-source
flume 1.6.0分支的代码中flume-ng-kafka-source
初步选定Flume 1.6.0分支中的flume-ng-kafka-source部分,这部分代码已经包含在flume-ng-extends-source。
执行命令:
两类jar包:
lib中jar包
libext中jar包
疑问:maven打包时,如何将当前jar包以及其依赖包都导出? 参考thilinamb flume kafka sink
在properties文件中进行配置,配置样本文件:
目标:定制ElasticSearchSink的serializer。
现象:设置ElasticSearchSink的参数
几点:
ElasticSearchSink中新的配置参数:
indexNameBuilder=org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder
上述将以
其他选项:org.apache.flume.sink.elasticsearch.SimpleIndexNameBuilder,其直接以设定的
dateFormat=
timeZONE=
serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
上述选项,将 flume event 的 Header 中 key-value 添加到一个新增的字段
其他选项:org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer,其直接将body、header构造为一个JSON字符串,添加到ElasticSearch中。
如果终止Flume Agent,然后重启。疑问:
Kafka中的数据,是否会重复发送到ElasticSearch?
Kafka中的数据,是否有遗漏,没有发送到ElasticSearch?
思考,几个情况:
Kafka对应的Consumer有offset
Kafka中数据,周期性的清理,例如默认3天
需要详细思考Flume Agent的重启场景。
分析:Flume Agent需要的工作,两点:
Flume Kafka Source:负责从Kafka中读取数据;
Flume ElasticSearch Sink:负责将数据送入ElasticSearch;
当前Flume 1.5.2已经包含了ElasticSearchSink,因此,需要定制实现Flume Kafka Source即可。当前从Jira上得知,Flume 1.6.0 中将包含Flume-ng-kafka-source,但是,当前Flume 1.6.0版本并没有发布,怎么办?两条路:
github上别人开源的Flume-ng-kafka-source
flume 1.6.0分支的代码中flume-ng-kafka-source
初步选定Flume 1.6.0分支中的flume-ng-kafka-source部分,这部分代码已经包含在flume-ng-extends-source。
编译代码
执行命令:mvn clean package得到jar包:
flume-ng-extends-source-x.x.x.jar。
安装插件
两类jar包:lib中jar包
flume-ng-extends-source-x.x.x.jar
libext中jar包
kafka_2.9.2-0.8.2.0.jar
kafka-clients-0.8.2.0.jar
metrics-core-2.2.0.jar
scala-library-2.9.2.jar
zkclient-0.3.jar
疑问:maven打包时,如何将当前jar包以及其依赖包都导出? 参考thilinamb flume kafka sink
配置
在properties文件中进行配置,配置样本文件:# Kafka Source For retrieve from Kafka cluster. agent.sources.seqGenSrc.type = com.github.ningg.flume.source.KafkaSource #agent.sources.seqGenSrc.batchSize = 2 agent.sources.seqGenSrc.batchDurationMillis = 1000 agent.sources.seqGenSrc.topic = good agent.sources.seqGenSrc.zookeeperConnect = 168.7.2.164:2181,168.7.2.165:2181,168.7.2.166:2181 agent.sources.seqGenSrc.groupId = elasticsearch #agent.sources.seqGenSrc.kafka.consumer.timeout.ms = 1000 #agent.sources.seqGenSrc.kafka.auto.commit.enable = false # ElasticSearchSink for ElasticSearch. agent.sinks.loggerSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink agent.sinks.loggerSink.indexName = flume agent.sinks.loggerSink.indexType = log agent.sinks.loggerSink.batchSize = 100 #agent.sinks.loggerSink.ttl = 5 agent.sinks.loggerSink.client = transport agent.sinks.loggerSink.hostNames = 168.7.1.69:9300 #agent.sinks.loggerSink.client = rest #agent.sinks.loggerSink.hostNames = 168.7.1.69:9200 #agent.sinks.loggerSink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
定制
目标:定制ElasticSearchSink的serializer。现象:设置ElasticSearchSink的参数
batchSize=1000后,当前ES中当天的Index中出现了
120,000+的记录,而此时,原有平台发现,当前产生的数据只有
20,000,因此,猜测KafkaSource将Kafka集群中指定topic下的所有数据都传入了ES中。
几点:
ElasticSearchSink中新的配置参数:
indexNameBuilder=org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder
上述将以
indexPrefix-
yyyy-MM-dd方式,每天产生一个Index;
其他选项:org.apache.flume.sink.elasticsearch.SimpleIndexNameBuilder,其直接以设定的
indexPrefix(实际就是设置的
indexName)
dateFormat=
yyyy-MM-dd
timeZONE=
Etc/UTC
serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
上述选项,将 flume event 的 Header 中 key-value 添加到一个新增的字段
@fields中;
其他选项:org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer,其直接将body、header构造为一个JSON字符串,添加到ElasticSearch中。
重启
如果终止Flume Agent,然后重启。疑问:Kafka中的数据,是否会重复发送到ElasticSearch?
Kafka中的数据,是否有遗漏,没有发送到ElasticSearch?
思考,几个情况:
Kafka对应的Consumer有offset
Kafka中数据,周期性的清理,例如默认3天
需要详细思考Flume Agent的重启场景。
相关文章推荐
- Allen OpenCart 多功能自适应主题模板 ABC-0705
- MachineLearning in Action_KNN
- Homework_1
- iOS开发遇到的坑之七--上传app Stroe被拒绝:The app references non-public symbols in : _UICreateCGImageFromIOSurface
- [函数] Firemonkey iOS 指定目录不要备份到 iCloud
- 仿微信字母快速查找联系人界面
- shareSDK 快速集成
- Java二维数组
- OpenGL 库函数汇总
- UIButton的contentEdgeInsets属性
- DataBinding使用笔记一
- Activity和Service的启动
- 如何降低win8.1的磁盘读写率
- 小东西
- node.js
- Lua.1 lua的类型与变量
- UC 浏览器调用支付宝本地支付
- DISA STIG
- Plugin with id 'android-apt' not found
- codeforces 630F Selection of Personnel(组合数)