Logstash 101: how to use Logstash to read and parse your logs!
Logstash is an ETL (Extract, Transform, Load) tool that automates data movement by extracting information from diverse sources, reshaping it, and loading it into a central repository for analysis and reporting, enhancing decision-making processes.
Logstash, a powerful open-source data processing and ingestion tool, plays a pivotal role in the Elastic Stack. In this blog, we’ll dive into the world of Logstash, exploring its installation process, data parsing capabilities, and data enrichment techniques.
You can download the Logstash package from here https://www.elastic.co/downloads/logstash
Or you can download it from the Package repository more details here https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
A Logstash pipeline comprises three integral components: input, filter, and output. Input plugins are responsible for ingesting data from a designated source, while filter plugins facilitate data manipulation according to your specifications. Lastly, output plugins serve to channel the processed data toward a chosen destination for storage or further utilization.
Let’s start with stashing our first event:
got to your Logstash folder
cd logstash-8.9.1
bin/logstash -e 'input { stdin { } } output { stdout {} }'
now type whatever you want, logstash will print it out with other details
hello
{
"host" => {
"hostname" => "Mohameds-MacBook-Pro.local"
},
"@timestamp" => 2023-08-28T20:15:23.247430Z,
"message" => "hello",
"event" => {
"original" => "hello"
},
"@version" => "1"
}
Let’s create our first pipeline and call it first_pipeline.yml, the logstash pipeline as you know it contains: input, filter, output.
you can run one or multiple pipelines in same time
input {
stdin { }
}
filter {
}
output {
stdout {}
}
to run the pipeline you use this command:
bin/logstash -f config/first_config.conf
No we have an example of a Firewall log,
2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block
now lets modify our first pipeline and replace the input with the following, we have changed the input to read from this file, you can change it to point where the your firewall logs are(or any other log file you want)
input {
file {
path => "/Users/mohamed/Documents/Training/firewall.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
output {
stdout {}
}
the result should be like this:
{
"message" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block",
"@version" => "1",
"@timestamp" => 2023-08-28T20:30:52.149616Z,
"host" => {
"name" => "Mohameds-MacBook-Pro.local"
},
"event" => {
"original" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block"
},
"log" => {
"file" => {
"path" => "/Users/mohamed/Documents/Training/firewall.log"
}
}
}
This is the log as you can see below, The log is coming in one field, which is the message field, It’s good so far, but better to parse the log and split it into different fields like timestamp, source.ip, destination.ip,source.port, destination.port.., to use later for your search, aggregation, detection, and so on, and also to be able to use the built-in detection and dashboards and other Elasticsearch capabilities.
We want to parse the log like this:
- timestamp: 2023–08–18 20:10:15
- source.ip: 172.16.42.105
- destination.ip: 54.239.25.200
- transport.protocol: UDP
- source.port: 51342
- destination.port: 53
- event.action: Blocked
- rule.name: Inbound_DNS_Block
Note: When it comes to naming the field it better to follow Elastic Common Schema: https://www.elastic.co/guide/en/ecs/current/index.html
Now you can start creating and testing your parser directly in Logstash, or there’s another tool in Kibana that you can use to create and test your parser, Another point this tool works only with Grok Parser.
In our example we use grok parser but there’s more that you can use depending on the use case, Here’s the link for the official documentation for the Logstash parser https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
Here’s the Sample Data
2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block
Here’s the Grok Pattern
%{TIMESTAMP_ISO8601:@timestamp} \[Firewall\] SRC: %{IP:source.ip}, DST: %{IP:destination.ip}, PROTO: %{WORD:network.transport}, SPORT: %{NUMBER:source.port}, DPORT: %{NUMBER:destination.port}, ACTION: %{WORD:event.action}, RULE: %{WORD:rule.name}
this is the output from the Grok Debbuger
{
"@timestamp": "2023-08-18 20:10:15",
"destination": {
"port": "53",
"ip": "54.239.25.200"
},
"rule": {
"name": "Inbound_DNS_Block"
},
"source": {
"port": "51342",
"ip": "172.16.42.105"
},
"event": {
"action": "Blocked"
},
"network": {
"transport": "UDP"
}
}
Once you configure your grok parser, you can now copy it to Logstash and run it here’s the Logstash configuration, the final logstash pipeline will be like this:
input {
file {
path => "/Users/mohamed/Documents/Training/firewall.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:@timestamp} \[Firewall\] SRC: %{IP:source.ip}, DST: %{IP:destination.ip}, PROTO: %{WORD:network.transport}, SPORT: %{NUMBER:source.port}, DPORT: %{NUMBER:destination.port}, ACTION: %{WORD:event.action}, RULE: %{WORD:rule.name}"
}
}
} output {
stdout {}
}
now you run the logstash, you will have the following result:
{
"source.ip" => "172.16.42.105",
"event.action" => "Blocked",
"log" => {
"file" => {
"path" => "/Users/mohamed/Documents/Training/firewall.log"
}
},
"network.transport" => "UDP",
"@timestamp" => 2023-08-28T19:54:06.822633Z,
"event" => {
"original" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block"
},
"source.port" => "51342",
"destination.port" => "53",
"message" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block",
"@version" => "1",
"destination.ip" => "54.239.25.200",
"host" => {
"name" => "Mohameds-MacBook-Pro.local"
},
"rule.name" => "Inbound_DNS_Block"
}
Once it works, now you can change the output to point to your Elasticsearh cluster.
In the end, I encourage you to explore further and experiment with Logstash’s advanced features!
Thanks for reading ^^