Flume-1.6.0修改
2015-07-01 16:54
435 查看
应项目需求,需要对多种日志收集
日志说明:
1、追加试,每天生成日志,定期删除。一条日志内容跨多行,日志间有无用信息
2、mysql主键自增追加,多个表,定期删除。
需求说明:
1、存入elasticsearch,按天建立index(由event的head生成)
2、存入hdfs,按天建文件(由event的body生成)
源代码 https://github.com/zjf92/modified-flume-1.6.0
以下为源码说明:
*对大文件处理存疑,想充分利用内存,但它不可控
DirRegexSource 功能设计 递归监控目录(筛选文件) 按字节数核对,重启续读 正则提取数据 内存使用上限totalMemory大于maxMemory的0.4 参数 monitorDir 必填 监控目录 monitorFileRegex [\\W\\w]+ 选择监控文件名的正则 checkFile 必填 读取的历史记录文件 contentRegex 必填 从change file中循环取content的正则 delayTime 30s 扫描目录间隔 charsetName UTF-8 文件编码 batchSize 1024 单个事务提交event最大数量 //------------------------------------------------------------------ SqlSource 功能设计 多表监控 index字段MAX记录,进行续读 内存使用上限totalMemory大于maxMemory的0.4 参数 checkFile 必填 index的MAX值 url 必填 mysql连接参数 username root mysql连接参数 password root mysql连接参数 tables 必填 表(以{split}分隔) columns 必填 字段(一层以{split}分隔、二层以,分隔) indexColumns 必填 顺序自增ID字段(以{split}分隔) delayTime 30 轮询间隔 batchSize 1024 单个事务提交event最大数量 //------------------------------------------------------------------ ElasticSearchSink 新功能设计 提供索引时刻滚动(分、时、天) value转义 旧参数 indexNameBuilder=org.apache.flume.sink.elasticsearch.HeadFieldIndexNameBuilder //按时刻滚动 新参数 indexNameBuilder.timeRollerFlag 时刻滚动必填 DAY/HOUR/MINUTE indexNameBuilder.formerField 时刻滚动必填 event的head的时间字段 indexNameBuilder.sdfParsePattern 时刻滚动必填 解析时间值的Pattern serializer.escapekeys 要转义key,","分割 //------------------------------------------------------------------ HDFSEventSink 在文件头部生成BOM(防utf-8乱码) 新功能设计 改重命名逻辑,重启续写(去.tmp缀的文件续写) 支持提供时刻滚动(分、时、天)(必须关闭其他滚动) 支持历史数据与新生数据交叉存储,历史文件存储 新参数 timeRollerFlag 时刻滚动必填 DAY/HOUR/MINUTE formerField 时刻滚动必填 event的head的时间字段 sdfParsePattern isNotBlank(formerField) 时刻滚动必填 解析时间值的Pattern //------------------------------------------------------------------ 拦截器链 BodyAppendByHeadInterceptor(全加、指定加) bodyLoopAppendFormat 二选一 字符串({key}、{value}、{separator}) 例:<{key}>={value} bodyCustomAppendFormat 二选一 字符串({@key}+str、{separator})例:{separator}<hostName>={hostName}{separator}<filePath>={filePath} BodyReplaceByBodyInterceptor(修改) bodyRegex 必填 多取正则({separator})例:<Message>=(size:) bodyStrs "" 字符串({separator}、{split})例: HeadPutByBodyInterceptor(全加、指定加) headLoopAppendRegex 二选一 双取正则 例:<([^>]*)>=((?:(?!\r?\n)[\\W\\w])*) headCustomAppendFields 二选一 字符串({split}) 例: headCustomAppendRegexs "" 正则({split}) 例: HeadPutByHeadInterceptor(修改) headKeys 必填 字符串({split}) 例:startTimeZ{split}closeTimeZ{split}params_length headValues "" 字符串({split}、{@key}+str、{@key.length}) 例:{StartTime}+08:00{split}{CloseTime}+08:00{split}{Params.length} //------------------------------------------------------------------
相关文章推荐
- Flume环境部署和配置详解及案例大全
- Play! Akka Flume实现的完整数据收集
- flume自定义Interceptor
- #Note# Analyzing Twitter Data with Apache Hadoo...
- flume、kafka、storm常用命令
- Flume向HDFS写数据实例
- flume+log4j整合到web项目
- 详细图解 Flume介绍、安装配置-1
- flume NG 中文 Welcome to Apache Flume 第一页 醉了
- flume 高可用性 高可靠性 agent source
- flume介绍及扩展开发心得
- Flume
- flume-两台机器上agent的串联运行
- log4j+flume+kafka管理日志,查询日志
- flume用户手册
- flume学习的总结
- 自定义的flume-ng的postgresql数据库sink
- 日志传输工具-Flume实现原理及应用
- flume跨机房数据传输
- flume