Skip to content

使用Filebeat采集JSON文件并导出到Logstash

  • by

ELK是当前主流的大数据搜索分析开源平台,包括了Elasticsearch(ES)、Logstash和Kibana等组件,可以用于各类型日志的处理、分析和展示。Filebeat是ELK生态中的一个新成员,用于日志文件的采集,采集后可以传输给Logstash或者是ES。本文记录使用Filebeat采集JSON格式的日志,传输到Logstash进行预处理,最终保存到ES的配置过程。

本文参考Filebeat ReferenceLogstash Reference,ELK组件均已升级到当前最新7.11版本。

配置Filebeat

Filebeat安装完成之后,需要进行配置。在CentOS系统中,配置文件默认路径:/etc/filebeat/filebeat.yml,是一个yml文件。

配置input:

filebeat.inputs:
- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /home/logAna/Desktop/cowrie.json

  # JSON file processing
  #  json.keys_under_root: true
  #  json.add_error_key: true
  • 文件类型是log文件
  • enabled设置为true,input配置生效
  • path是需要采集的文件路径,可以是单个文件,也可以使用正则匹配多个文件。这里配置了cowrie蜜罐一个JSON格式的日志作为样例。
  • json.keys_under_rootjson.add_error_key:如果设置true,Filebeat会自动按照key-value解析JSON格式的log文件。但是由于Logstash也同样可以解析JSON,所以在这里直接输出日志原文到Logstash再进行解析。

配置output到本地的Logstash,Logstash默认监听5044端口,并且没有设置加密:

output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

配置processor,Filebeat也可以做简单的预处理:

processors:
        #  - add_host_metadata:
        #      when.not.contains.tags: forwarded
        #  - add_cloud_metadata: ~
        #  - add_docker_metadata: ~
        #  - add_kubernetes_metadata: ~
        #  - drop_fields:
        #      fields: ["agent", "log", "tags", "ecs", "@version", "host", "input"]
  1. Filebeat采集会默认添加一些元信息(metadata),例如采集主机(host)的信息等,可以注释掉。

  2. 如果配置input开启了json.keys_under_rootjson.add_error_key的设置,在这里可以对解析进行一些初步的预处理,例如使用drop_fields去除一些字段等。由于Logstash也有类似的功能,在这里就不做设置。

自此Filebeat设置完成,检测配置是否正确:

$ sudo filebeat test config -e

...
2021-03-15T05:08:02.811-0400    INFO    instance/beat.go:304    Setup Beat: filebeat; Version: 7.11.1
2021-03-15T05:08:02.811-0400    INFO    [publisher] pipeline/module.go:113  Beat name: localhost.localdomain
2021-03-15T05:08:02.814-0400    WARN    beater/filebeat.go:178  Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
Config OK

测试Filebeat配置,由于Filebeat的输出是Logstash,也需要进行简单的配置。在Logstash配置文件路径/etc/logstash/创建一个配置文件logstash-sample.conf,配置input来自filebeat,接收端口为5044;输出日志信息到显示屏:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

output {
  stdout { codec => rubydebug }
}

运行Filebeat和Logstash:

$ sudo /usr/share/logstash/bin/logstash -f logstash-sample.conf --config.reload.automatic
...

[INFO ] 2021-03-15 05:41:48.820 [[main]<beats] Server - Starting server on port: 5044
[INFO ] 2021-03-15 05:41:48.961 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}

$ sudo /usr/share/filebeat/bin/filebeat -e -c filebeat.yml -d "publish"
...
2021-03-15T05:43:21.374-0400    INFO    [publisher_pipeline_output] pipeline/output.go:143  Connecting to backoff(async(tcp://localhost:5044))
2021-03-15T05:43:21.374-0400    INFO    [publisher] pipeline/retry.go:219   retryer: send unwait signal to consumer
2021-03-15T05:43:21.374-0400    INFO    [publisher] pipeline/retry.go:223     done
2021-03-15T05:43:21.375-0400    INFO    [publisher_pipeline_output] pipeline/output.go:151  Connection to backoff(async(tcp://localhost:5044)) established

可以看到日志文件中的每一条JSON记录被Filebeat采集并输入到Logstash,JSON记录内容保存为message`字段的值。其中一条记录如下:

...
{
           "ecs" => {
        "version" => "1.6.0"
    },
    "@timestamp" => 2021-03-15T09:43:20.373Z,
       "message" => "{\"eventid\":\"cowrie.session.connect\",\"src_ip\":\"x.x.x.x\",\"src_port\":33546,\"dst_ip\":\"x.x.x.x\",\"dst_port\":22,\"session\":\"08b02dcbcb19\",\"protocol\":\"ssh\",\"message\":\"New connection: x.x.x.x:33546 (x.x.x.x:2222) [session: 08b02dcbcb19]\",\"sensor\":\"cowrie\",\"timestamp\":\"2021-03-10T00:00:05.009804Z\"}",
         "agent" => {
                "name" => "localhost.localdomain",
                "type" => "filebeat",
             "version" => "7.11.1",
            "hostname" => "localhost.localdomain",
                  "id" => "27900a22-b308-48c1-b65e-b80817126f4d",
        "ephemeral_id" => "944f3af7-7113-478e-a83c-6d440f38d657"
    },
           "log" => {
          "file" => {
            "path" => "/home/logAna/Desktop/cowrie.json"
        },
        "offset" => 6645
    },
      "@version" => "1",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
         "input" => {
        "type" => "log"
    },
          "host" => {
        "name" => "localhost.localdomain"
    }
}

配置Logstash

同样使用/etc/logstash/logstash-sample.conf进行配置。

配置input来源是beats:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

配置filter:

filter {
  json {
    source => "message"
    # remove metadata add by filebeat
    remove_field => ["agent", "tags", "@version", "ecs", "offset", "input", "host", "log", "@timestamp"]
  }

  date {
    match => [ "timestamp", "ISO8601"]
  }

  geoip {
    source => "src_ip"
    fields => ["city_name", "region_name", "country_name", "ip", "location"]
  }

  mutate {
    update => { "dst_port" => "22" }
    rename => ["[geoip][ip]", "[geoip][src_ip]"]
  }
}

filter配置是Logstash中的关键配置,相当于对日志文件进行预处理。Logstash默认提供多个fiter,这里用到的包括jsonmutatedate等。

  • json filter:对JSON格式的日志记录按照key-value进行解析,必须设置source字段,即指定解析内容保存在哪个Filebeat输出的字段里,这里是message。
    • 可以设置remove_field对解析后的字段进行去除,例如在这里去除了多个由Filebeat添加的元信息字段。
  • mutate filter:修改日志记录信息,例如更新某个键的值。
  • date filter:配置时间相关信息,第一个是包含时间戳的字段名,第二个值是时间戳的格式。匹配后如果没有特殊设置,Logstash会默认生成一个@timestamp的键值对。

测试配置:

sudo /usr/share/logstash/bin/logstash -f logstash-cowrie.conf --config.test_and_exit
...
Configuration OK
[INFO ] 2021-03-19 05:51:02.152 [LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

自此Filebeat和Logstash都完成配置,运行后日志成功被Filebeat采集,经过Logstash预处理后,保存到ES。

Tags:

Leave a Reply

Your email address will not be published. Required fields are marked *