Logstash 插件

input 插件

input插件用于指定数据输入源,一个pipeline可以有多个input插件,常用的input插件主要包括:stdin、file、kafka等。

stdin

最简单的输入,从标准输入读取数据,通用配置为:

  • codec: 类型为codec

  • type: 类型为string,自定义该事件的类型,可用于后续判断

  • tags: 类型为array,自定义该事件的tag,可用于后续判断

  • add_field: 类型为hash,为该事件添加字段

有如下的配置:

input {
    stdin {
        codec => "plain"
        tags => ["test"]
        type => "std"
        add_field => {"key" => "value"}
    }
}

output {
    stdout {
        codec=>"rubydebug"
    }
}

在logstash启动的时候,指定上面的配置文件:

echo "test" | bin/logstash -f test/input-stdin.conf

输入如下内容:

{
        "key" => "value",
    "@version" => "1",
        "tags" => [
        [0] "test"
    ],
        "host" => "ubuntu",
        "type" => "std",
    "@timestamp" => 2018-12-31T09:45:07.935Z,
    "message" => "test"
}

file

从文件中读取数据,如常见的日志文件。

  • 文件内容只被读取一次,每次读取的位置会记录在sincedb中,重启logstash时,会从sincedb中获取记录的文件位置

  • 会定时检查文件是否有更新,以保证即使读取到文件的新内容

  • 定时检查新文件,以发现新文件并进行读取

  • 读取文件内容时,使用的是inode,如果文件发生了归档操作,不会影响当前的内容被读取

配置

配置项 含义
path 类型为数组,指明读取的文件路径,基于glob匹配语法,如:path => [“/var/log/*/.log”, “/var/log/message”]
exclue 类型为数组,排除不想监听的文件规则,基于glob匹配语法,如:exclude => “.gz”
sincedb_path 类型为字符串,记录sincedb文件路径
start_postion 类型为字符串,beginning or end,是否从头读取文件
start_interval 类型为数值,单位为秒,定时检查文件是否有更新,默认为1秒
discover_interval 类型为数值,单位为秒,定时检查文件是否有新文件待读取,默认为15秒
ignore_older 类型为数值,单位为秒,扫描文件列表时,如果该文件从上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭
close_older 类型为数值,单位为秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认为3600秒

glob 配置语法

主要包含如下几种匹配符:

匹配符 含义
* 匹配任意字符,但不匹配以.开头的隐藏文件,匹配隐藏文件是要使用.*来匹配
** 递归匹配子目录
? 匹配单一字符
[] 匹配多个字符,比如[a-z]、[^a-z]
{} 匹配多个单词,比如{foo, bar, hello}
\ 转义符号

如:

  • /var/log/*.log:匹配/var/log目录下的.log结尾的文件

  • /var/log/**/*.log:匹配/var/log所有子目录下.log结尾的文件

  • /var/log/{app1, app2, app3}/*.log:匹配/var/log目录下的app1、app2、app3目录中以.log结尾的文件

有如下配置文件,logstash会从/var/log/access.log/var/log/err.log从读取内容:

input {
    file {
        path => ["/var/log/access.log", "/var/log/err.log"]
    }
    type => "web",
    start_position => "beginning"
}

start_position => "beginning"表示第一次读取文件的时候是从头开始读,但是如果logstash的sincedb中有记录了上次读取的文件位置,则会从sincedb记录的上次读取的文件位置中读取文件内容。

在调试的时候每次重启logstash需要从头读取文件的内容,可以使用如下的配置:

input {
    file {
        path => ["/var/log/access.log", "/var/log/err.log"]
    }
    sincedb => "/dev/null",
    start_position => "beginning"
    ignore_older => 0
    close_older => 5
    discover_interval => 1
}

kafka

kafka在logstash中使用也是比较常用的,一般使用如下:

input {
    kafka {
        zk_connect => "kafka:2181"
        group_id => "logstash"
        topic_id => "apache_logs"
        consumer_threads => 16
    }
}

codec 插件

codec插件作用input和output插件,负责将数据在与logstash event之间转换,常见的codec有:

插件 作用
plain 读取原始的内容
dots 将内容简化为点进行输出
rubydebug 将logstash events按照ruby格式输出,方便调试
line 处理带有换行符的内容
json 处理json格式的内容
multiline 处理多行数据的内容

line

如下是使用codec => line 从stdin读取数据,然后再按照rubydebug进行输出:

bin/logstash -e "input { stdin { codec=>line }} output { stdout { codec=>rubydebug }}"

输入如下内容:

hello, world

输入如下rubydebug内容:

{
    "message" => "hello, world",
    "@version" => "1",
    "@timestamp" => 2018-12-31T12:28:04.647Z,
        "host" => "ubuntu"
}

dots

如果不想看到logstash的输出,只想看到输出的进度,可以将output的codec设置为dots。

如下是使用codec => line 从stdin读取数据,然后再按照dots进行输出,:

bin/logstash -e "input { stdin { codec=>line }} output { stdout { codec=>dots }}"

json

如下是使用codec => json 从stdin读取数据,然后再按照rubydebug进行输出

bin/logstash -e "input { stdin { codec=>json }} output { stdout { codec=>rubydebug }}"

输入如下的json内容:

{"name": "elk", "age": 5}

输入如下rubydebug内容:

{
        "age" => 5,
    "@version" => "1",
    "@timestamp" => 2018-12-31T12:36:14.430Z,
        "host" => "ubuntu",
        "name" => "elk"
}

multiline

当一个event的message由多行组成时,需要使用该codec,常见的情况是堆栈日志信息的处理,如:

Exception in thread "main.java.lang.NullPointerException"
    at com.example.myproject.Book.getTitle(Book.java:16)
    at com.example.myproject.Author.getBookTitles(Author.java:25)
    at com.example.myproject.Bootstrap.main(Bootstrap.java:14)

主要设置参数如下:

  • pattern:设置行匹配的正则表达式,可以使用gork

  • what pervious|next: 如果匹配成功,那么匹配行是归属上一个事件还是下一个事件

  • negate true|false:是否对pattern的结果取反

使用如下配置可以匹配上面的堆栈信息:

input {
    stdin {
        codec => multiline {
            pattern => "^\s"
            what => "previous"
        }
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

输入上面的堆栈信息,将得到如下的输出信息:

{
    "@version" => "1",
    "@timestamp" => 2018-12-31T12:52:55.639Z,
        "tags" => [
        [0] "multiline"
    ],
    "message" => "Exception in thread \"main.java.lang.NullPointerException\"\n        at com.example.myproject.Book.getTitle(Book.java:16)\n        at com.example.myproject.Author.getBookTitles(Author.java:25)\n        at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
        "host" => "ubuntu"
}

如下内容:

printf("%10.10|d \t %10.10|d \t %s\
%f", w, x, y, z);

使用下面的配置如下匹配:

input {
    stdin {
        codec => multiline {
            pattern => "\\$"
                what => "next"
        }
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

输入上面的内容,将得到如下的输出信息:

{
        "tags" => [
        [0] "multiline"
    ],
    "@timestamp" => 2018-12-31T13:07:07.496Z,
        "host" => "ubuntu",
    "@version" => "1",
    "message" => "printf(\"%10.10|d \\t %10.10|d \\t %s\\\n    %f\", w, x, y, z);"
}

filter 插件

filter是logstash功能强大的主要原因,它可以对logstash event进行丰富的处理,比如解析数据、删除字段、类型转换等,常见的有如下几个:

插件 作用
date 日期解析
grok 正则匹配解析
dissect 分隔符解析
mutate 对字段处理,比如重命名、删除、替换等
json 安装json解析字段内容到指定字段中
geoip 增加地理位置数据
ruby 利用ruby代码来动态修改logstash event

date

将日期字符串解析为日期类型,然后替换@timestamp字段或者指定的其他字段。

主要设置参数如下:

  • match: 类型为数组,用于指定日期匹配的格式,可以一次指定多种日期格式, 如: match[“logdate”, “MMM dd yyyy HH:mm:ss”, “MMM d yyyy HH:mm:ss”, “ISO8601”]

  • target: 类型为字符串,用于指定赋值的字段名,默认是@timestamp

  • timezone:类型为字符串,用于指定时区

有如下的配置:

input {
    stdin {
        codec => json
    }
}

filter {
    date {
        match => ["logdate", "MMM dd yyyy HH:mm:ss"]
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

输入如下的内容:

{"logdate": "Jan 01 2019 12:02:03"}

输出如下的内容,因为我们所在的区域在东八区,所以@timestamp输出的内容有8个小时的差异:

{
    "logdate" => "Jan 01 2019 12:02:03",
    "@timestamp" => 2019-01-01T04:02:03.000Z,
        "host" => "ubuntu",
    "@version" => "1"
}

grok

grok就是一些定义好的正则表达式。

grok语法如下:

%{SYNTAX:SEMANTIC}
  • SYNTAX为grok pattern的名称,SEMANTIC为赋值字段名称

  • %{NUMBER:duration} 可以匹配数值类型,但是grok匹配出的内容都是字符串类型,可以通过在最后指定为int或者float来强制转换类型,如:%{NUMBER:duration:float}

常见的一些pattern:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

如下数据是nginx产生的日志:

217.168.17.5 - - [17/May/2015:08:05:34 +0000] "GET /downloads/product_1 HTTP/1.1" 200 490 "-" "Debian APT-HTTP/1.3 (0.8.10.3)"

使用如下的gork配置可以匹配上面的数据:

filter {
    grok {
        match => {
        "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"
        }
    }
}

匹配后输出如下内容:

{
   "response" => 200,
"httpversion" => "1.1",
   "@version" => "1",
  "timestamp" => "17/May/2015:08:05:34 +0000",
    "message" => "217.168.17.5 - - [17/May/2015:08:05:34 +0000] \"GET /downloads/product_1 HTTP/1.1\" 200 490 \"-\" \"Debian APT-HTTP/1.3 (0.8.10.3)\"",
      "ident" => "-",
       "verb" => "GET",
    "request" => "/downloads/product_1",
       "auth" => "-",
   "clientip" => "217.168.17.5",
   "referrer" => "\"-\"",
       "host" => "192.168.1.1",
 "@timestamp" => 2019-01-01T03:41:22.911Z,
      "bytes" => 490,
      "agent" => "\"Debian APT-HTTP/1.3 (0.8.10.3)\""
}
自定义grok匹配
基于正则表达式匹配规则

如下是定义了一个(?<service_name>[0-9a-z]{10, 11}):

filter {
    grok {
        match => {
        "message" => "(?<service_name>[0-9a-z]{10,11})"
        }
    }
}

输入数据abc1234567,匹配后输出:

{
    "@version" => "1",
"service_name" => "abc1234567",
        "host" => "192.168.1.1",
     "message" => "abc1234567",
  "@timestamp" => 2019-01-01T03:49:26.182Z
}

如果输入数据abc123456,匹配将会失败,输入如下内容:

{
  "@version" => "1",
      "host" => "192.168.1.1",
   "message" => "abc123456",
"@timestamp" => 2019-01-01T03:50:55.032Z,
      "tags" => [
    [0] "_grokparsefailure"
]
}
gork pattern

gork也支持自定义pattern:

  • pattern_definitions参数,以键值对的方式定义pattern名称和内容

  • pattern_dir参数,以文件的形式被读取

如下是把上面的正则表达式[0-9a-z]{10,11}定义为一个SERVICE的 gork pattern:

filter {
    grok {
        match => {
        "message" => "%{SERVICE:service_name}"
        }

        pattern_definitions => {
            "SERVICE" => "[a-z0-9]{10,11}"
        }
    }
}
match 匹配多种样式

match也是支持匹配多种样式,如:

filter {
    grok {
        match => {
            "message" => [ "Duration: %{NUMBER: duration}", "Speed: %{NUMBER: speed}"]
        }
    }
}
overwrite

有如下的配置:

filter {
    grok {
        match => {
        "message" => "%{SERVICE:service_name} %{NUMBER:message}"
        }

        pattern_definitions => {
            "SERVICE" => "[a-z0-9]{10,11}"
        }
    }

如果输入数据abc1234567 123,输出如下内容:

{
    "@version" => "1",
     "message" => [
    [0] "abc1234567 123",
    [1] "123"
],
"service_name" => "abc1234567",
        "host" => "192.168.1.1",
  "@timestamp" => 2019-01-02T12:25:06.218Z
}

可以看到service_name的内容和message[0]重复了,如果想去掉message[0]的输出,这时就可以使用overwrite:

filter {
    grok {
        match => {
        "message" => "%{SERVICE:service_name} %{NUMBER:message}"
        }

        overwrite => ["message"]

        pattern_definitions => {
            "SERVICE" => "[a-z0-9]{10,11}"
        }
    }
}

使用上面的配置,可以看到在message字段中只输出123

{
    "@version" => "1",
     "message" => "123",
"service_name" => "abc1234567",
        "host" => "192.168.1.1",
  "@timestamp" => 2019-01-02T12:29:04.079Z
}
tag_on_failure

匹配失败后的输出,默认是_grokparsefailure

grok 调试建议

下面的两个网站可以在线调试正则表达式:

下面的两个网站可以在线调试grok:

dissect

dissect是基于分隔符来解析数据,解决grok用正则表达式解析时消耗过多cpu资源的问题。

dissect的应用有一定的局限性:主要适用于每行相似且分隔符明确简单的场景。

匹配语法

dissect语法比较简单,有一系列字段和分隔符组成:

  • %{} 字段

  • %{} 之间是分隔符

有如下数据:

hi - hello - 12

如果使用如下的dissect匹配(如果不写名称,即%{},表明忽略该值):

%{a} - %{b} - %{c}

匹配的结果为:

{
    "a": "hi",
    "b": "hello",
    "c": "12"
}
追加字段

如下配置,+代表该匹配值追加到ts字段中:

filter {
    dissect {
        mapping => {
        "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}"
        }
    }
}

输入如下数据:

Apr 16 12:20:02 localhost systemd[1]: Starting system activity accounting tool...

匹配输出如下内容:

{
  "@version" => "1",
   "message" => "Apr 16 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
       "src" => "localhost",
       "pid" => "1",
      "host" => "192.168.1.1",
       "msg" => "Starting system activity accounting tool...",
"@timestamp" => 2019-01-02T12:53:48.958Z,
        "ts" => "Apr 16 12:20:02",
      "prog" => "systemd"
}
改变顺序

如下配置,/后面的数字代表拼接的次序:

filter {
    dissect {
        mapping => {
        "message" => "%{+order/2} %{+order/3} %{+order/1} %{+order/4}"
        }
    }
}

输入如下内容:

one three two go

匹配后输出如下内容:

{
  "@version" => "1",
   "message" => "one three two go",
     "order" => "two one three go",
      "host" => "192.168.1.1",
"@timestamp" => 2019-01-02T13:05:52.526Z
}
匹配后赋值

如输入以下内容:

a=1&b=2

匹配后想输出如下内容:

{
    "a": "1",
    "b": "2"
}

可以使用下面的匹配:

%{?key1}=%{&key1}&%{?key2}=%{&key2}

%{?}代表忽略匹配值,但是赋予字段名,用于后续匹配。

%{&}代表将匹配值赋予key1的匹配值。

convert_datatype

dissect分割后的字段值都是字符串,可以使用convert_datatype属性进行类型转换:

filter {
    dissect {
        convert_datatype {
            age => int
        }
    }
}

mutate

可以对字段进行各种操作,比如重命名、删除、替换、更新等,主要操作如下:

  • convert: 类型转换

  • gsub:字符串替换

  • split/join/merge:字符串切割、数组合并为字符串、数组合并为数组

  • rename:字段重命名

  • update/replace:字段内容更新或替换

  • remove_field:删除字段

convert

实现字段类型的转换,类型为hash,仅支持转换为integer、float、string、和boolean

filter {
    mutate {
        convert => {"age" => "integer"}
    }
}
gsub

对字段内容进行替换,类型为数组,每3项为一个替换配置。

如下是将path字段中的/替换为_、urlparams中的\?#-替换为.

filter {
    mutate {
        gsub => [
            "path", "/", "_",
            "urlparams", "[\\?#-]", "."
        ]
    }
}
split

如下是将jobs字段以,为分隔,将字符串切割为数组:

filter {
    mutate {
        split => {"jobs" => ','}
    }
}
rename

如下是将HOSTORIP字段名重命名为clientip:

filter {
    mutate {
        rename => {"HOSTORIP" => "clientip"}
    }
}
update/replace

更新字段内容,区别在于update只在字段存在时生效,而replace在字段不存在时会执行新增字段操作:

filter {
    mutate {
        replace => {
            "message" => "%{source_host}: My new message"
        }
    }
}
remove_field

如下表示在结果中删除headers字段:

filter {
    mutate {
        remove_field => "headers"
    }
}

json

将字段内容为json格式的数据进行解析,用source指定要更新的字段名、target指定解析后的存储字段,如果不指定target默认和message同级别:

filter {
    json {
        source => "message"
        target => "msg_json"
    }  
}

geoip

根据ip地址转换对应的地域信息,比如经纬度、城市等,方便进行地理数据分析:

filter {
    geoip {
        source => "message"
    }
}

输入ip地址:119.75.217.26,输出如下内容:

{
      "host" => "192.168.1.1",
   "message" => "119.75.217.26",
  "@version" => "1",
     "geoip" => {
       "region_name" => "Beijing",
          "timezone" => "Asia/Shanghai",
      "country_name" => "China",
                "ip" => "119.75.217.26",
     "country_code2" => "CN",
    "continent_code" => "AS",
          "latitude" => 39.9289,
     "country_code3" => "CN",
       "region_code" => "11",
          "location" => {
        "lat" => 39.9289,
        "lon" => 116.3883
    },
         "longitude" => 116.3883,
         "city_name" => "Beijing"
},
"@timestamp" => 2019-01-03T12:39:41.318Z
}

output 插件

负责将logstash event输出,常见的插件如下:

  • stdout

  • file

  • elasticsearch

stdout

输出到标准输出,多用于调试:

output {
    stdout {
        codec => rubydebug
    }
}

file

输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到1个文件中,从而方便查阅信息,默认输出json格式的数据,通过format可以输出原始格式:

output {
    file {
        path => "/var/log/web.log"
        codec => line {
            format => "%{message}"
        }
    }
}

elasticsearch

输出到elasticsearch:

output {
    elasticsearch {
        hosts => ["192.168.1.100:9200", "192.168.1.101:9200"]
        index => "nginx-%{+YYYY.MM.dd}"
        template => "./nginx_template.json"
        template_name => "nginx_template"
        template_overwrite => true
    }
}