您的位置:首页 > 大数据

大数据日志分析系统-logstash

2018-02-27 14:52 441 查看

logstash简介

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

logstash-2.2.2的配置:

从logstash-forward        到kafka的配置
ubuntu@sp1:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf

input {      lumberjack {                   port => "5044"   ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt"   ssl_key =>  "/home/ubuntu/logstash-2.2.2/config/lumberjack.key"   type => "fc_access"                  }      }
output {       if "_grokparsefailure" not in [tags] {#       stdout { codec => rubydebug }       kafka {              topic_id => "kafka_es"              bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092"       compression_type => "snappy"        acks => ["1"]        value_serializer => "org.apache.kafka.common.serialization.StringSerializer"       timeout_ms => 10000       retries => 5       retry_backoff_ms => 100       send_buffer_bytes => 102400             workers => 2             }       }}

从kafka到es配置
其中包括了对日志各个字段的解析,以及对异常日志过滤(同时注意其中过滤了 不属于当前时间前后5天的时间的日志,为了防止异常日志创建索引过多导致es报红)
ubuntu@sp1:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.confinput {      kafka {      topic_id => "kafka_es"      group_id => "kafka_es"      zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181"       consumer_threads => 1      consumer_restart_on_error => true      consumer_restart_sleep_ms => 5000      decorate_events => true      consumer_timeout_ms => 1000      queue_size => 100      auto_offset_reset => "smallest"      rebalance_max_retries => 50      }      }
filter {       mutate {             add_field => [ "messageClone", "%{message}" ]       }
       mutate {             split => { "messageClone" => '"' }       add_field => {"agent" => "%{[messageClone][3]}"}       }
       useragent {               source => "agent"        }
       mutate {             split => { "message" => " " }       add_field => {"timestamp" => "%{[message][0]}"}       add_field => {"reqtime" => "%{[message][1]}"}       add_field => {"clientIP" => "%{[message][2]}"}       add_field => {"squidCache" => "%{[message][3]}"}       add_field => {"repsize" => "%{[message][4]}"}       add_field => {"reqMethod" => "%{[message][5]}"}       add_field => {"requestURL" => "%{[message][6]}"}       add_field => {"username" => "%{[message][7]}"}       add_field => {"requestOriginSite" => "%{[message][8]}"}       add_field => {"mime" => "%{[message][9]}"}       add_field => {"referer" => "%{[message][10]}"}       add_field => {"agentCheck" => "%{[message][11]}"}       add_field => {"dnsGroup" => "%{[message][-1]}"}          remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"]       }
       if [agentCheck] =~ "ChinaCache" {         grok { match => { "agentCheck" => "OOPS" } }       }
       mutate {          convert => {                "timestamp" => "float"                 "reqtime" => "integer"           "repsize" => "integer"          }       remove_field => ["agentCheck"]       }

       ruby {       code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')"       }
       date { match => [ "timestamp_str", "ISO8601" ]        }
       mutate {             split => { "requestURL" => '/' }       add_field => {"uriHost" => "%{[requestURL][2]}"}       remove_field => ["timestamp_str"]       }
       mutate {             join => { "requestURL" => '/' }       }
       ruby {              code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"       }

}
output {if "ChinaCache" not in [agent] {#                   stdout { codec => "rubydebug" }                   elasticsearch {                         index => "logstash-%{+YYYY.MM.dd.HH}"                         workers => 1                         flush_size => 5000                         idle_flush_time => 1                         hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]
                         }        }}

启动命令:
nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &
nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &

logstash-6.1.1配置
从filbeat到kafka的配置
ubuntu@sp26:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf
input {    beats {        port => "5055" type => "log"    }}output {#   stdout { codec => rubydebug }  kafka {    codec => "json"    bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092"    topic_id => "test" compression_type => "snappy" value_serializer => "org.apache.kafka.common.serialization.StringSerializer"  }}

检测
/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf  --config.test_and_exit
启动

nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic   2>&1 >  /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log  &
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: