ELK是很流行的日志收集架构,通过LogStash搜集日志,ES存储,Kinaba展示。LogStash通过插件和一个工作流,完成了一整套日志收集、过滤、统计、输出等工作。
数据类型
Logstash 支持少量的数据值类型:
bool
debug => true
string
host => "hostname"
number
port => 514
array
match => ["datetime", "UNIX", "ISO8601"]
hash
options => {
key1 => "value1",
key2 => "value2"
}
条件判断(condition)
- ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于),
>=
(大于等于)- =~(匹配正则), !~(不匹配正则)
- in(包含), not in(不包含)
- and(与), or(或), nand(非与), xor(非或)
- ()(复合表达式), !()(对复合表达式结果取反)
插件plugin
列表,安装,升级,本地文件安装
bin/logstash-plugin list
bin/logstash-plugin install logstash-output-webhdfs
bin/logstash-plugin update logstash-input-tcp
bin/logstash-plugin install /path/to/logstash-filter-crash.gem
输入插件input
file
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
还有监听文件频率,关闭时间等参数,详细配置见:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
stdin
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
还有其他输入插件如:syslog,kafka,log4j等
编码插件(Codec)
logstash 是一个input | decode | filter | encode | output 的数据流,有点像linux的管道符处理,codec就是用来 decode、encode 事件的。
JSON
input {
file {
path => "/usr/local/openresty/work/logs/access.log_json"
codec => "json"
}
}
output {
stdout { codec=> rubydebug }
}
nginx log配置
log_format json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"host":"$server_addr",'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"domain":"$host",'
'"url":"$uri",'
'"status":"$status"}';
access_log logs/access.log_json json;
启动访问nginx。logstash输出:
{
"path" => "/usr/local/openresty/work/logs/access.log_json",
"@timestamp" => 2016-12-24T06:35:21.000Z,
"size" => 31,
"domain" => "127.0.0.1",
"@version" => "1",
"host" => "127.0.0.1",
"client" => "127.0.0.1",
"responsetime" => 0.0,
"url" => "/",
"status" => "200",
"tags" => []
}
Multiline 和并多行,这个需求很有必要,比如java日志,含error错误信息的一条日志被分成很多行,我们希望它被当作一条数据来处理,方便统计、过滤等。
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
过滤器插件
Grok Grok 是 Logstash 最重要的插件。你可以在 grok 里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。 示例如我们的另一篇博客Logstash搜集spring-cloud日志,可以把一条日志切割成很多段。 详细见官方文档:Grok filter plugin
输出插件
elasticsearch
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
flush_size => 20000
idle_flush_time => 10
sniffing => true
template_overwrite => true
}
}
Stdout
output {
stdout {
codec => rubydebug
workers => 2
}
}
Zabbix
output {
if [type] == "heartbeat" {
file {
path => "/data1/logstash-log/local6-5160-%{+YYYY.MM.dd}.log"
}
zabbix {
zabbix_host => "zbxhost"
zabbix_key => "zbxkey"
zabbix_server_host => "zabbix.example.com"
zabbix_value => "clock"
}
} else {
elasticsearch { }
}
}