大数据日志分析系统-logstash
2018-02-27 14:52
441 查看
logstash简介
Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。logstash-2.2.2的配置:
从logstash-forward 到kafka的配置ubuntu@sp1:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf
input { lumberjack { port => "5044" ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt" ssl_key => "/home/ubuntu/logstash-2.2.2/config/lumberjack.key" type => "fc_access" } }
output { if "_grokparsefailure" not in [tags] {# stdout { codec => rubydebug } kafka { topic_id => "kafka_es" bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092" compression_type => "snappy" acks => ["1"] value_serializer => "org.apache.kafka.common.serialization.StringSerializer" timeout_ms => 10000 retries => 5 retry_backoff_ms => 100 send_buffer_bytes => 102400 workers => 2 } }}
从kafka到es配置
其中包括了对日志各个字段的解析,以及对异常日志过滤(同时注意其中过滤了 不属于当前时间前后5天的时间的日志,为了防止异常日志创建索引过多导致es报红)
ubuntu@sp1:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.confinput { kafka { topic_id => "kafka_es" group_id => "kafka_es" zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181" consumer_threads => 1 consumer_restart_on_error => true consumer_restart_sleep_ms => 5000 decorate_events => true consumer_timeout_ms => 1000 queue_size => 100 auto_offset_reset => "smallest" rebalance_max_retries => 50 } }
filter { mutate { add_field => [ "messageClone", "%{message}" ] }
mutate { split => { "messageClone" => '"' } add_field => {"agent" => "%{[messageClone][3]}"} }
useragent { source => "agent" }
mutate { split => { "message" => " " } add_field => {"timestamp" => "%{[message][0]}"} add_field => {"reqtime" => "%{[message][1]}"} add_field => {"clientIP" => "%{[message][2]}"} add_field => {"squidCache" => "%{[message][3]}"} add_field => {"repsize" => "%{[message][4]}"} add_field => {"reqMethod" => "%{[message][5]}"} add_field => {"requestURL" => "%{[message][6]}"} add_field => {"username" => "%{[message][7]}"} add_field => {"requestOriginSite" => "%{[message][8]}"} add_field => {"mime" => "%{[message][9]}"} add_field => {"referer" => "%{[message][10]}"} add_field => {"agentCheck" => "%{[message][11]}"} add_field => {"dnsGroup" => "%{[message][-1]}"} remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"] }
if [agentCheck] =~ "ChinaCache" { grok { match => { "agentCheck" => "OOPS" } } }
mutate { convert => { "timestamp" => "float" "reqtime" => "integer" "repsize" => "integer" } remove_field => ["agentCheck"] }
ruby { code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')" }
date { match => [ "timestamp_str", "ISO8601" ] }
mutate { split => { "requestURL" => '/' } add_field => {"uriHost" => "%{[requestURL][2]}"} remove_field => ["timestamp_str"] }
mutate { join => { "requestURL" => '/' } }
ruby { code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs" }
}
output {if "ChinaCache" not in [agent] {# stdout { codec => "rubydebug" } elasticsearch { index => "logstash-%{+YYYY.MM.dd.HH}" workers => 1 flush_size => 5000 idle_flush_time => 1 hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]
} }}
启动命令:
nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &
nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &
logstash-6.1.1配置
从filbeat到kafka的配置
ubuntu@sp26:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf
input { beats { port => "5055" type => "log" }}output {# stdout { codec => rubydebug } kafka { codec => "json" bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092" topic_id => "test" compression_type => "snappy" value_serializer => "org.apache.kafka.common.serialization.StringSerializer" }}
检测
/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.test_and_exit
启动
nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic 2>&1 > /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log &
相关文章推荐
- 大数据日志分析系统边缘节点日志上传-flume,filbeat,logstash-forward
- Splunk大数据日志分析系统远程获取日志数据
- 用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- 搭建ELK日志分析系统(三)-Logstash安装和使用
- 系统运行日志基础数据分析
- Centos 6.5 安装nginx日志分析系统 elasticsearch + logstash + redis + kibana
- 大数据技术学习笔记之网站流量日志分析项目:Flume日志采集系统1
- 搭建ELK(ElasticSearch+Logstash+Kibana)日志分析系统(二) Logstash简介及常见配置语法
- ELK(ElasticSearch+Logstash+Kibana)+redis日志收集分析系统
- ELK学习2_用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- 大数据日志分析系统-介绍 二-整体架构介绍
- 用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- 搭建ELK(ElasticSearch+Logstash+Kibana)日志分析系统(十三) kibana 界面查询语法
- LogStash日志分析展示系统
- 大数据平台网站日志分析系统
- 搭建ELK(ElasticSearch+Logstash+Kibana)日志分析系统(三) logstash input output 配置
- logstash日志收集分析系统elasticsearch&kibana 推荐
- 用Kibana+Logstash+Elasticsearch快速搭建实时日志查询、收集与分析系统
- 用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- LogStash日志分析系统