ELK之Logstash

ELK是很流行的日志收集架构,通过LogStash搜集日志,ES存储,Kinaba展示。LogStash通过插件和一个工作流,完成了一整套日志收集、过滤、统计、输出等工作。

数据类型

Logstash 支持少量的数据值类型:

bool

debug => true

string

host => "hostname"

number

port => 514

array

match => ["datetime", "UNIX", "ISO8601"]

hash

options => {
    key1 => "value1",
    key2 => "value2"
}

条件判断(condition)

  • ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于),
  • >=(大于等于)
  • =~(匹配正则), !~(不匹配正则)
  • in(包含), not in(不包含)
  • and(与), or(或), nand(非与), xor(非或)
  • ()(复合表达式), !()(对复合表达式结果取反)

插件plugin

列表,安装,升级,本地文件安装

bin/logstash-plugin list
bin/logstash-plugin install logstash-output-webhdfs
bin/logstash-plugin update logstash-input-tcp
bin/logstash-plugin install /path/to/logstash-filter-crash.gem

输入插件input

file

input {
    file {
        path => ["/var/log/*.log", "/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}

还有监听文件频率,关闭时间等参数,详细配置见:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

stdin

input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}

还有其他输入插件如:syslog,kafka,log4j等

编码插件(Codec)

logstash 是一个input | decode | filter | encode | output 的数据流,有点像linux的管道符处理,codec就是用来 decode、encode 事件的。

JSON

input {
    file {
        path => "/usr/local/openresty/work/logs/access.log_json"
        codec => "json"
    }
}
output {
       stdout { codec=> rubydebug }
}

nginx log配置

log_format json '{"@timestamp":"$time_iso8601",'
               '"@version":"1",'
               '"host":"$server_addr",'
               '"client":"$remote_addr",'
               '"size":$body_bytes_sent,'
               '"responsetime":$request_time,'
               '"domain":"$host",'
               '"url":"$uri",'
               '"status":"$status"}';
access_log logs/access.log_json json;

启动访问nginx。logstash输出:

{
"path" => "/usr/local/openresty/work/logs/access.log_json",
"@timestamp" => 2016-12-24T06:35:21.000Z,
"size" => 31,
"domain" => "127.0.0.1",
"@version" => "1",
"host" => "127.0.0.1",
"client" => "127.0.0.1",
"responsetime" => 0.0,
"url" => "/",
"status" => "200",
"tags" => []
}

Multiline
和并多行,这个需求很有必要,比如java日志,含error错误信息的一条日志被分成很多行,我们希望它被当作一条数据来处理,方便统计、过滤等。

input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}

过滤器插件

Grok
Grok 是 Logstash 最重要的插件。你可以在 grok 里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。
示例如我们的另一篇博客Logstash搜集spring-cloud日志,可以把一条日志切割成很多段。
详细见官方文档:Grok filter plugin

输出插件

elasticsearch

output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        flush_size => 20000
        idle_flush_time => 10
        sniffing => true
        template_overwrite => true
    }
}

Stdout

output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}

Zabbix

output {
    if [type] == "heartbeat" {
        file {
            path => "/data1/logstash-log/local6-5160-%{+YYYY.MM.dd}.log"
        }
        zabbix {
            zabbix_host => "zbxhost"
            zabbix_key => "zbxkey"
            zabbix_server_host => "zabbix.example.com"
            zabbix_value => "clock"
        }
    } else {
        elasticsearch { }
    }
}
CONTENTS