ELK是当前主流的大数据搜索分析开源平台,包括了Elasticsearch(ES)、Logstash和Kibana等组件,可以用于各类型日志的处理、分析和展示。Filebeat是ELK生态中的一个新成员,用于日志文件的采集,采集后可以传输给Logstash或者是ES。本文记录使用Filebeat采集JSON格式的日志,传输到Logstash进行预处理,最终保存到ES的配置过程。
本文参考Filebeat Reference和Logstash Reference,ELK组件均已升级到当前最新7.11版本。
配置Filebeat
Filebeat安装完成之后,需要进行配置。在CentOS系统中,配置文件默认路径:/etc/filebeat/filebeat.yml
,是一个yml
文件。
配置input:
filebeat.inputs:
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /home/logAna/Desktop/cowrie.json
# JSON file processing
# json.keys_under_root: true
# json.add_error_key: true
- 文件类型是
log
文件 - 把
enabled
设置为true,input配置生效 path
是需要采集的文件路径,可以是单个文件,也可以使用正则匹配多个文件。这里配置了cowrie
蜜罐一个JSON格式的日志作为样例。json.keys_under_root
和json.add_error_key
:如果设置true,Filebeat会自动按照key-value
解析JSON格式的log文件。但是由于Logstash也同样可以解析JSON,所以在这里直接输出日志原文到Logstash再进行解析。
配置output到本地的Logstash,Logstash默认监听5044端口,并且没有设置加密:
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
配置processor,Filebeat也可以做简单的预处理:
processors:
# - add_host_metadata:
# when.not.contains.tags: forwarded
# - add_cloud_metadata: ~
# - add_docker_metadata: ~
# - add_kubernetes_metadata: ~
# - drop_fields:
# fields: ["agent", "log", "tags", "ecs", "@version", "host", "input"]
- Filebeat采集会默认添加一些元信息(metadata),例如采集主机(host)的信息等,可以注释掉。
-
如果配置input开启了
json.keys_under_root
和json.add_error_key
的设置,在这里可以对解析进行一些初步的预处理,例如使用drop_fields
去除一些字段等。由于Logstash也有类似的功能,在这里就不做设置。
自此Filebeat设置完成,检测配置是否正确:
$ sudo filebeat test config -e
...
2021-03-15T05:08:02.811-0400 INFO instance/beat.go:304 Setup Beat: filebeat; Version: 7.11.1
2021-03-15T05:08:02.811-0400 INFO [publisher] pipeline/module.go:113 Beat name: localhost.localdomain
2021-03-15T05:08:02.814-0400 WARN beater/filebeat.go:178 Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
Config OK
测试Filebeat配置,由于Filebeat的输出是Logstash,也需要进行简单的配置。在Logstash配置文件路径/etc/logstash/
创建一个配置文件logstash-sample.conf
,配置input来自filebeat,接收端口为5044;输出日志信息到显示屏:
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
output {
stdout { codec => rubydebug }
}
运行Filebeat和Logstash:
$ sudo /usr/share/logstash/bin/logstash -f logstash-sample.conf --config.reload.automatic
...
[INFO ] 2021-03-15 05:41:48.820 [[main]<beats] Server - Starting server on port: 5044
[INFO ] 2021-03-15 05:41:48.961 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
$ sudo /usr/share/filebeat/bin/filebeat -e -c filebeat.yml -d "publish"
...
2021-03-15T05:43:21.374-0400 INFO [publisher_pipeline_output] pipeline/output.go:143 Connecting to backoff(async(tcp://localhost:5044))
2021-03-15T05:43:21.374-0400 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer
2021-03-15T05:43:21.374-0400 INFO [publisher] pipeline/retry.go:223 done
2021-03-15T05:43:21.375-0400 INFO [publisher_pipeline_output] pipeline/output.go:151 Connection to backoff(async(tcp://localhost:5044)) established
可以看到日志文件中的每一条JSON记录被Filebeat采集并输入到Logstash,JSON记录内容保存为message`字段的值。其中一条记录如下:
...
{
"ecs" => {
"version" => "1.6.0"
},
"@timestamp" => 2021-03-15T09:43:20.373Z,
"message" => "{\"eventid\":\"cowrie.session.connect\",\"src_ip\":\"x.x.x.x\",\"src_port\":33546,\"dst_ip\":\"x.x.x.x\",\"dst_port\":22,\"session\":\"08b02dcbcb19\",\"protocol\":\"ssh\",\"message\":\"New connection: x.x.x.x:33546 (x.x.x.x:2222) [session: 08b02dcbcb19]\",\"sensor\":\"cowrie\",\"timestamp\":\"2021-03-10T00:00:05.009804Z\"}",
"agent" => {
"name" => "localhost.localdomain",
"type" => "filebeat",
"version" => "7.11.1",
"hostname" => "localhost.localdomain",
"id" => "27900a22-b308-48c1-b65e-b80817126f4d",
"ephemeral_id" => "944f3af7-7113-478e-a83c-6d440f38d657"
},
"log" => {
"file" => {
"path" => "/home/logAna/Desktop/cowrie.json"
},
"offset" => 6645
},
"@version" => "1",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"input" => {
"type" => "log"
},
"host" => {
"name" => "localhost.localdomain"
}
}
配置Logstash
同样使用/etc/logstash/logstash-sample.conf
进行配置。
配置input来源是beats:
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
配置filter:
filter {
json {
source => "message"
# remove metadata add by filebeat
remove_field => ["agent", "tags", "@version", "ecs", "offset", "input", "host", "log", "@timestamp"]
}
date {
match => [ "timestamp", "ISO8601"]
}
geoip {
source => "src_ip"
fields => ["city_name", "region_name", "country_name", "ip", "location"]
}
mutate {
update => { "dst_port" => "22" }
rename => ["[geoip][ip]", "[geoip][src_ip]"]
}
}
filter配置是Logstash中的关键配置,相当于对日志文件进行预处理。Logstash默认提供多个fiter,这里用到的包括json
、mutate
、date
等。
- json filter:对JSON格式的日志记录按照key-value进行解析,必须设置
source
字段,即指定解析内容保存在哪个Filebeat输出的字段里,这里是message。- 可以设置
remove_field
对解析后的字段进行去除,例如在这里去除了多个由Filebeat添加的元信息字段。
- 可以设置
- mutate filter:修改日志记录信息,例如更新某个键的值。
- date filter:配置时间相关信息,第一个是包含时间戳的字段名,第二个值是时间戳的格式。匹配后如果没有特殊设置,Logstash会默认生成一个
@timestamp
的键值对。
测试配置:
sudo /usr/share/logstash/bin/logstash -f logstash-cowrie.conf --config.test_and_exit
...
Configuration OK
[INFO ] 2021-03-19 05:51:02.152 [LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
自此Filebeat和Logstash都完成配置,运行后日志成功被Filebeat采集,经过Logstash预处理后,保存到ES。