input 插件
input插件用于指定数据输入源,一个pipeline可以有多个input插件,常用的input插件主要包括:stdin、file、kafka等。
stdin
最简单的输入,从标准输入读取数据,通用配置为:
codec: 类型为codec
type: 类型为string,自定义该事件的类型,可用于后续判断
tags: 类型为array,自定义该事件的tag,可用于后续判断
add_field: 类型为hash,为该事件添加字段
有如下的配置:
input {
stdin {
codec => "plain"
tags => ["test"]
type => "std"
add_field => {"key" => "value"}
}
}
output {
stdout {
codec=>"rubydebug"
}
}
在logstash启动的时候,指定上面的配置文件:
echo "test" | bin/logstash -f test/input-stdin.conf
输入如下内容:
{
"key" => "value",
"@version" => "1",
"tags" => [
[0] "test"
],
"host" => "ubuntu",
"type" => "std",
"@timestamp" => 2018-12-31T09:45:07.935Z,
"message" => "test"
}
file
从文件中读取数据,如常见的日志文件。
文件内容只被读取一次,每次读取的位置会记录在sincedb中,重启logstash时,会从sincedb中获取记录的文件位置
会定时检查文件是否有更新,以保证即使读取到文件的新内容
定时检查新文件,以发现新文件并进行读取
读取文件内容时,使用的是inode,如果文件发生了归档操作,不会影响当前的内容被读取
配置
| 配置项 | 含义 |
|---|---|
| path | 类型为数组,指明读取的文件路径,基于glob匹配语法,如:path => [“/var/log/*/.log”, “/var/log/message”] |
| exclue | 类型为数组,排除不想监听的文件规则,基于glob匹配语法,如:exclude => “.gz” |
| sincedb_path | 类型为字符串,记录sincedb文件路径 |
| start_postion | 类型为字符串,beginning or end,是否从头读取文件 |
| start_interval | 类型为数值,单位为秒,定时检查文件是否有更新,默认为1秒 |
| discover_interval | 类型为数值,单位为秒,定时检查文件是否有新文件待读取,默认为15秒 |
| ignore_older | 类型为数值,单位为秒,扫描文件列表时,如果该文件从上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭 |
| close_older | 类型为数值,单位为秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认为3600秒 |
glob 配置语法
主要包含如下几种匹配符:
| 匹配符 | 含义 |
|---|---|
| * | 匹配任意字符,但不匹配以.开头的隐藏文件,匹配隐藏文件是要使用.*来匹配 |
| ** | 递归匹配子目录 |
| ? | 匹配单一字符 |
| [] | 匹配多个字符,比如[a-z]、[^a-z] |
| {} | 匹配多个单词,比如{foo, bar, hello} |
| \ | 转义符号 |
如:
/var/log/*.log:匹配/var/log目录下的.log结尾的文件/var/log/**/*.log:匹配/var/log所有子目录下.log结尾的文件/var/log/{app1, app2, app3}/*.log:匹配/var/log目录下的app1、app2、app3目录中以.log结尾的文件
有如下配置文件,logstash会从/var/log/access.log、/var/log/err.log从读取内容:
input {
file {
path => ["/var/log/access.log", "/var/log/err.log"]
}
type => "web",
start_position => "beginning"
}
start_position => "beginning"表示第一次读取文件的时候是从头开始读,但是如果logstash的sincedb中有记录了上次读取的文件位置,则会从sincedb记录的上次读取的文件位置中读取文件内容。
在调试的时候每次重启logstash需要从头读取文件的内容,可以使用如下的配置:
input {
file {
path => ["/var/log/access.log", "/var/log/err.log"]
}
sincedb => "/dev/null",
start_position => "beginning"
ignore_older => 0
close_older => 5
discover_interval => 1
}
kafka
kafka在logstash中使用也是比较常用的,一般使用如下:
input {
kafka {
zk_connect => "kafka:2181"
group_id => "logstash"
topic_id => "apache_logs"
consumer_threads => 16
}
}
codec 插件
codec插件作用input和output插件,负责将数据在与logstash event之间转换,常见的codec有:
| 插件 | 作用 |
|---|---|
| plain | 读取原始的内容 |
| dots | 将内容简化为点进行输出 |
| rubydebug | 将logstash events按照ruby格式输出,方便调试 |
| line | 处理带有换行符的内容 |
| json | 处理json格式的内容 |
| multiline | 处理多行数据的内容 |
line
如下是使用codec => line 从stdin读取数据,然后再按照rubydebug进行输出:
bin/logstash -e "input { stdin { codec=>line }} output { stdout { codec=>rubydebug }}"
输入如下内容:
hello, world
输入如下rubydebug内容:
{
"message" => "hello, world",
"@version" => "1",
"@timestamp" => 2018-12-31T12:28:04.647Z,
"host" => "ubuntu"
}
dots
如果不想看到logstash的输出,只想看到输出的进度,可以将output的codec设置为dots。
如下是使用codec => line 从stdin读取数据,然后再按照dots进行输出,:
bin/logstash -e "input { stdin { codec=>line }} output { stdout { codec=>dots }}"
json
如下是使用codec => json 从stdin读取数据,然后再按照rubydebug进行输出
bin/logstash -e "input { stdin { codec=>json }} output { stdout { codec=>rubydebug }}"
输入如下的json内容:
{"name": "elk", "age": 5}
输入如下rubydebug内容:
{
"age" => 5,
"@version" => "1",
"@timestamp" => 2018-12-31T12:36:14.430Z,
"host" => "ubuntu",
"name" => "elk"
}
multiline
当一个event的message由多行组成时,需要使用该codec,常见的情况是堆栈日志信息的处理,如:
Exception in thread "main.java.lang.NullPointerException"
at com.example.myproject.Book.getTitle(Book.java:16)
at com.example.myproject.Author.getBookTitles(Author.java:25)
at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
主要设置参数如下:
pattern:设置行匹配的正则表达式,可以使用gork
what pervious|next: 如果匹配成功,那么匹配行是归属上一个事件还是下一个事件
negate true|false:是否对pattern的结果取反
使用如下配置可以匹配上面的堆栈信息:
input {
stdin {
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}
output {
stdout {
codec => rubydebug
}
}
输入上面的堆栈信息,将得到如下的输出信息:
{
"@version" => "1",
"@timestamp" => 2018-12-31T12:52:55.639Z,
"tags" => [
[0] "multiline"
],
"message" => "Exception in thread \"main.java.lang.NullPointerException\"\n at com.example.myproject.Book.getTitle(Book.java:16)\n at com.example.myproject.Author.getBookTitles(Author.java:25)\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"host" => "ubuntu"
}
如下内容:
printf("%10.10|d \t %10.10|d \t %s\
%f", w, x, y, z);
使用下面的配置如下匹配:
input {
stdin {
codec => multiline {
pattern => "\\$"
what => "next"
}
}
}
output {
stdout {
codec => rubydebug
}
}
输入上面的内容,将得到如下的输出信息:
{
"tags" => [
[0] "multiline"
],
"@timestamp" => 2018-12-31T13:07:07.496Z,
"host" => "ubuntu",
"@version" => "1",
"message" => "printf(\"%10.10|d \\t %10.10|d \\t %s\\\n %f\", w, x, y, z);"
}
filter 插件
filter是logstash功能强大的主要原因,它可以对logstash event进行丰富的处理,比如解析数据、删除字段、类型转换等,常见的有如下几个:
| 插件 | 作用 |
|---|---|
| date | 日期解析 |
| grok | 正则匹配解析 |
| dissect | 分隔符解析 |
| mutate | 对字段处理,比如重命名、删除、替换等 |
| json | 安装json解析字段内容到指定字段中 |
| geoip | 增加地理位置数据 |
| ruby | 利用ruby代码来动态修改logstash event |
date
将日期字符串解析为日期类型,然后替换@timestamp字段或者指定的其他字段。
主要设置参数如下:
match: 类型为数组,用于指定日期匹配的格式,可以一次指定多种日期格式, 如: match[“logdate”, “MMM dd yyyy HH:mm:ss”, “MMM d yyyy HH:mm:ss”, “ISO8601”]
target: 类型为字符串,用于指定赋值的字段名,默认是
@timestamptimezone:类型为字符串,用于指定时区
有如下的配置:
input {
stdin {
codec => json
}
}
filter {
date {
match => ["logdate", "MMM dd yyyy HH:mm:ss"]
}
}
output {
stdout {
codec => rubydebug
}
}
输入如下的内容:
{"logdate": "Jan 01 2019 12:02:03"}
输出如下的内容,因为我们所在的区域在东八区,所以@timestamp输出的内容有8个小时的差异:
{
"logdate" => "Jan 01 2019 12:02:03",
"@timestamp" => 2019-01-01T04:02:03.000Z,
"host" => "ubuntu",
"@version" => "1"
}
grok
grok就是一些定义好的正则表达式。
grok语法如下:
%{SYNTAX:SEMANTIC}
SYNTAX为grok pattern的名称,SEMANTIC为赋值字段名称
%{NUMBER:duration}可以匹配数值类型,但是grok匹配出的内容都是字符串类型,可以通过在最后指定为int或者float来强制转换类型,如:%{NUMBER:duration:float}
常见的一些pattern:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
如下数据是nginx产生的日志:
217.168.17.5 - - [17/May/2015:08:05:34 +0000] "GET /downloads/product_1 HTTP/1.1" 200 490 "-" "Debian APT-HTTP/1.3 (0.8.10.3)"
使用如下的gork配置可以匹配上面的数据:
filter {
grok {
match => {
"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"
}
}
}
匹配后输出如下内容:
{
"response" => 200,
"httpversion" => "1.1",
"@version" => "1",
"timestamp" => "17/May/2015:08:05:34 +0000",
"message" => "217.168.17.5 - - [17/May/2015:08:05:34 +0000] \"GET /downloads/product_1 HTTP/1.1\" 200 490 \"-\" \"Debian APT-HTTP/1.3 (0.8.10.3)\"",
"ident" => "-",
"verb" => "GET",
"request" => "/downloads/product_1",
"auth" => "-",
"clientip" => "217.168.17.5",
"referrer" => "\"-\"",
"host" => "192.168.1.1",
"@timestamp" => 2019-01-01T03:41:22.911Z,
"bytes" => 490,
"agent" => "\"Debian APT-HTTP/1.3 (0.8.10.3)\""
}
自定义grok匹配
基于正则表达式匹配规则
如下是定义了一个(?<service_name>[0-9a-z]{10, 11}):
filter {
grok {
match => {
"message" => "(?<service_name>[0-9a-z]{10,11})"
}
}
}
输入数据abc1234567,匹配后输出:
{
"@version" => "1",
"service_name" => "abc1234567",
"host" => "192.168.1.1",
"message" => "abc1234567",
"@timestamp" => 2019-01-01T03:49:26.182Z
}
如果输入数据abc123456,匹配将会失败,输入如下内容:
{
"@version" => "1",
"host" => "192.168.1.1",
"message" => "abc123456",
"@timestamp" => 2019-01-01T03:50:55.032Z,
"tags" => [
[0] "_grokparsefailure"
]
}
gork pattern
gork也支持自定义pattern:
pattern_definitions参数,以键值对的方式定义pattern名称和内容
pattern_dir参数,以文件的形式被读取
如下是把上面的正则表达式[0-9a-z]{10,11}定义为一个SERVICE的 gork pattern:
filter {
grok {
match => {
"message" => "%{SERVICE:service_name}"
}
pattern_definitions => {
"SERVICE" => "[a-z0-9]{10,11}"
}
}
}
match 匹配多种样式
match也是支持匹配多种样式,如:
filter {
grok {
match => {
"message" => [ "Duration: %{NUMBER: duration}", "Speed: %{NUMBER: speed}"]
}
}
}
overwrite
有如下的配置:
filter {
grok {
match => {
"message" => "%{SERVICE:service_name} %{NUMBER:message}"
}
pattern_definitions => {
"SERVICE" => "[a-z0-9]{10,11}"
}
}
如果输入数据abc1234567 123,输出如下内容:
{
"@version" => "1",
"message" => [
[0] "abc1234567 123",
[1] "123"
],
"service_name" => "abc1234567",
"host" => "192.168.1.1",
"@timestamp" => 2019-01-02T12:25:06.218Z
}
可以看到service_name的内容和message[0]重复了,如果想去掉message[0]的输出,这时就可以使用overwrite:
filter {
grok {
match => {
"message" => "%{SERVICE:service_name} %{NUMBER:message}"
}
overwrite => ["message"]
pattern_definitions => {
"SERVICE" => "[a-z0-9]{10,11}"
}
}
}
使用上面的配置,可以看到在message字段中只输出123:
{
"@version" => "1",
"message" => "123",
"service_name" => "abc1234567",
"host" => "192.168.1.1",
"@timestamp" => 2019-01-02T12:29:04.079Z
}
tag_on_failure
匹配失败后的输出,默认是_grokparsefailure。
grok 调试建议
下面的两个网站可以在线调试正则表达式:
下面的两个网站可以在线调试grok:
dissect
dissect是基于分隔符来解析数据,解决grok用正则表达式解析时消耗过多cpu资源的问题。
dissect的应用有一定的局限性:主要适用于每行相似且分隔符明确简单的场景。
匹配语法
dissect语法比较简单,有一系列字段和分隔符组成:
%{} 字段
%{} 之间是分隔符
有如下数据:
hi - hello - 12
如果使用如下的dissect匹配(如果不写名称,即%{},表明忽略该值):
%{a} - %{b} - %{c}
匹配的结果为:
{
"a": "hi",
"b": "hello",
"c": "12"
}
追加字段
如下配置,+代表该匹配值追加到ts字段中:
filter {
dissect {
mapping => {
"message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}"
}
}
}
输入如下数据:
Apr 16 12:20:02 localhost systemd[1]: Starting system activity accounting tool...
匹配输出如下内容:
{
"@version" => "1",
"message" => "Apr 16 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
"src" => "localhost",
"pid" => "1",
"host" => "192.168.1.1",
"msg" => "Starting system activity accounting tool...",
"@timestamp" => 2019-01-02T12:53:48.958Z,
"ts" => "Apr 16 12:20:02",
"prog" => "systemd"
}
改变顺序
如下配置,/后面的数字代表拼接的次序:
filter {
dissect {
mapping => {
"message" => "%{+order/2} %{+order/3} %{+order/1} %{+order/4}"
}
}
}
输入如下内容:
one three two go
匹配后输出如下内容:
{
"@version" => "1",
"message" => "one three two go",
"order" => "two one three go",
"host" => "192.168.1.1",
"@timestamp" => 2019-01-02T13:05:52.526Z
}
匹配后赋值
如输入以下内容:
a=1&b=2
匹配后想输出如下内容:
{
"a": "1",
"b": "2"
}
可以使用下面的匹配:
%{?key1}=%{&key1}&%{?key2}=%{&key2}
%{?}代表忽略匹配值,但是赋予字段名,用于后续匹配。
%{&}代表将匹配值赋予key1的匹配值。
convert_datatype
dissect分割后的字段值都是字符串,可以使用convert_datatype属性进行类型转换:
filter {
dissect {
convert_datatype {
age => int
}
}
}
mutate
可以对字段进行各种操作,比如重命名、删除、替换、更新等,主要操作如下:
convert: 类型转换
gsub:字符串替换
split/join/merge:字符串切割、数组合并为字符串、数组合并为数组
rename:字段重命名
update/replace:字段内容更新或替换
remove_field:删除字段
convert
实现字段类型的转换,类型为hash,仅支持转换为integer、float、string、和boolean
filter {
mutate {
convert => {"age" => "integer"}
}
}
gsub
对字段内容进行替换,类型为数组,每3项为一个替换配置。
如下是将path字段中的/替换为_、urlparams中的\、?、#、-替换为.:
filter {
mutate {
gsub => [
"path", "/", "_",
"urlparams", "[\\?#-]", "."
]
}
}
split
如下是将jobs字段以,为分隔,将字符串切割为数组:
filter {
mutate {
split => {"jobs" => ','}
}
}
rename
如下是将HOSTORIP字段名重命名为clientip:
filter {
mutate {
rename => {"HOSTORIP" => "clientip"}
}
}
update/replace
更新字段内容,区别在于update只在字段存在时生效,而replace在字段不存在时会执行新增字段操作:
filter {
mutate {
replace => {
"message" => "%{source_host}: My new message"
}
}
}
remove_field
如下表示在结果中删除headers字段:
filter {
mutate {
remove_field => "headers"
}
}
json
将字段内容为json格式的数据进行解析,用source指定要更新的字段名、target指定解析后的存储字段,如果不指定target默认和message同级别:
filter {
json {
source => "message"
target => "msg_json"
}
}
geoip
根据ip地址转换对应的地域信息,比如经纬度、城市等,方便进行地理数据分析:
filter {
geoip {
source => "message"
}
}
输入ip地址:119.75.217.26,输出如下内容:
{
"host" => "192.168.1.1",
"message" => "119.75.217.26",
"@version" => "1",
"geoip" => {
"region_name" => "Beijing",
"timezone" => "Asia/Shanghai",
"country_name" => "China",
"ip" => "119.75.217.26",
"country_code2" => "CN",
"continent_code" => "AS",
"latitude" => 39.9289,
"country_code3" => "CN",
"region_code" => "11",
"location" => {
"lat" => 39.9289,
"lon" => 116.3883
},
"longitude" => 116.3883,
"city_name" => "Beijing"
},
"@timestamp" => 2019-01-03T12:39:41.318Z
}
output 插件
负责将logstash event输出,常见的插件如下:
stdout
file
elasticsearch
stdout
输出到标准输出,多用于调试:
output {
stdout {
codec => rubydebug
}
}
file
输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到1个文件中,从而方便查阅信息,默认输出json格式的数据,通过format可以输出原始格式:
output {
file {
path => "/var/log/web.log"
codec => line {
format => "%{message}"
}
}
}
elasticsearch
输出到elasticsearch:
output {
elasticsearch {
hosts => ["192.168.1.100:9200", "192.168.1.101:9200"]
index => "nginx-%{+YYYY.MM.dd}"
template => "./nginx_template.json"
template_name => "nginx_template"
template_overwrite => true
}
}