用ELK 实时处理搜索日志
2016-01-27 17:53
309 查看
转载请标明原处:/article/8161069.html
本来这块业务 是放到SolrCloud上去的 , 然后 采用solr的facet统计查询,
具体代码参考之前写的文章:/article/8161061.html
最近遇到SolrCloud 遇到一些问题。。查询db时间过长,SolrCloud的长连接CloudSolrServer老timeout,索引的效率也不够满
意。为了稳定,暂时先还原solr单机版本(上线时,被运维打回来了)。搜索日志就用elasticsearch实时去处理。
大概流程:
基于日志系统ELK 的原型下,参考ELK处理nginx日志文章:/article/8161067.html
还是用logstash正则去解析搜索日志。搜索日志采用log4j生成,logstash检测到传递给elasticsearch。
这里要注意聚合操作的时候。Logstash 自带有一个优化好的模板。其默认的mapping,string类型都是analyzer。也就是说,默认分
词是采用单字分词的。
修改默认的logstash mapping模板。参考 http://udn.yyuap.com/doc/logstash-best-practice-cn/output/elasticsearch.html
结构如下:
启动logstash:
执行搜索测试。
可以马上在elasticsearch的插件上看到该搜索行为日志的数据索引。这就是elk的实时性了。
本来这块业务 是放到SolrCloud上去的 , 然后 采用solr的facet统计查询,
具体代码参考之前写的文章:/article/8161061.html
最近遇到SolrCloud 遇到一些问题。。查询db时间过长,SolrCloud的长连接CloudSolrServer老timeout,索引的效率也不够满
意。为了稳定,暂时先还原solr单机版本(上线时,被运维打回来了)。搜索日志就用elasticsearch实时去处理。
大概流程:
基于日志系统ELK 的原型下,参考ELK处理nginx日志文章:/article/8161067.html
还是用logstash正则去解析搜索日志。搜索日志采用log4j生成,logstash检测到传递给elasticsearch。
log4j:
log4j.appender.E.layout.ConversionPattern= %d|%m%n
logstash配置
新增logstash_search.conf:input { file { type => "searchword" path => ["/home/work/log/hotword/data"] } } filter { grok { match => [ "message", "%{TIMESTAMP_ISO8601:timestamp}\|\{%{GREEDYDATA:kvs}\}" ] } kv { source => "kvs" field_split => "," value_split => "=" trimkey => " " } date { match => ["timestamp" , "YYYY-MM-dd HH:mm:ss,SSS"] } } output { elasticsearch { hosts => ["host1:9200", "host2:9200", "host3:9200", "host4:9200"] index => "searchword-%{+YYYY.MM.dd}" } }
这里要注意聚合操作的时候。Logstash 自带有一个优化好的模板。其默认的mapping,string类型都是analyzer。也就是说,默认分
词是采用单字分词的。
修改默认的logstash mapping模板。参考 http://udn.yyuap.com/doc/logstash-best-practice-cn/output/elasticsearch.html
结构如下:
启动logstash:
nohup bin/logstash -f conf/logstash_search.conf &
执行搜索测试。
可以马上在elasticsearch的插件上看到该搜索行为日志的数据索引。这就是elk的实时性了。
elasticsearch java端
参考指定mapping和聚合查询代码:Client client=esobj.getClient(); SearchResponse response = client.prepareSearch("searchword*").setTypes("searchword").addAggregation(AggregationBuilders.terms("hotword").field("keyword")).execute().actionGet(); Terms terms = response.getAggregations().get("hotword");
/** * 初始化索引 * @param client * @param indexName * @param indexType * @param cols * @return 初始化成功,返回true;否则返回false * @throws Exception */ public static boolean initIndexMapping(Client client, String indexName, String indexType, List<ColumnInfo> cols) throws Exception { if(StringUtil.isEmpty(indexName) || StringUtil.isEmpty(indexType)) { return false; } indexName = indexName.toLowerCase(); indexType = indexType.toLowerCase(); //判断索引库是否存在 if(indicesExists(client, indexName)) { OpenIndexRequestBuilder openIndexBuilder = new OpenIndexRequestBuilder(client.admin().indices(), OpenIndexAction.INSTANCE); openIndexBuilder.setIndices(indexName).execute().actionGet(); }else{ //不存在则新建索引库 client.admin().indices().prepareCreate(indexName).execute().actionGet(); } TypesExistsRequest ter = new TypesExistsRequest(new String[]{indexName.toLowerCase()}, indexType); boolean typeExists = client.admin().indices().typesExists(ter).actionGet().isExists(); //如果 存在 返回!不能覆盖mapping if(typeExists) { return true; } //定义索引字段属性 XContentBuilder mapping = jsonBuilder().startObject().startObject(indexType).startObject("properties"); for (ColumnInfo col : cols) { String colName = col.getName().toLowerCase().trim(); String colType = col.getType().toLowerCase().trim(); if("string".equals(colType)) { mapping.startObject(colName).field("type", colType).field("store", ""+col.isStore()).field("indexAnalyzer", col.getIndexAnalyzer()).field("searchAnalyzer", col.getSearchAnalyzer()).field("include_in_all", col.isStore()).field("boost", col.getBoost()).endObject(); }else if("long".equals(colType)) { mapping.startObject(colName).field("type", colType).field("index", "not_analyzed").field("include_in_all", false).endObject(); }else if("date".equals(colType)) { mapping.startObject(colName).field("type", colType).field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd").field("index", "not_analyzed").field("include_in_all", false).endObject(); }else { mapping.startObject(colName).field("type", "string").field("index", "not_analyzed").endObject(); } } mapping.endObject().endObject().endObject(); PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(indexType).source(mapping); PutMappingResponse response = client.admin().indices().putMapping(mappingRequest).actionGet(); return response.isAcknowledged(); }
相关文章推荐
- 第一章 Shiro简介——《跟我学Shiro》
- 前端构建工具gulp入门教程
- PIR Motion Sensor with Arduino
- Oculus Rift DK2 安装所需电脑配置
- Inspecting mysql meta data
- Volley的使用方法详解
- 利用Matlab标定参数在OpenCV中进行立体匹配
- linux 分区 大于2T 磁盘
- JVM系列
- 安装VS2010后,向SQL Server 2008 Express添加SQL Server Management
- android service 详解
- cocos的plist最好不用
- java双缓冲消除闪烁
- ARM指令教程
- oracle文件属性导致的ORA-12547故障
- 图片、界面xib等资源文件封装到.a静态库
- linux下查看日志基本命令
- Java重定向标准输入/输出
- 1,安装
- hdu 2841(容斥原理)