Logstash 101: how to use Logstash to read and parse your logs!

Saidani Mohamed El Amine
4 min readAug 28, 2023

--

Logstash is an ETL (Extract, Transform, Load) tool that automates data movement by extracting information from diverse sources, reshaping it, and loading it into a central repository for analysis and reporting, enhancing decision-making processes.

Logstash, a powerful open-source data processing and ingestion tool, plays a pivotal role in the Elastic Stack. In this blog, we’ll dive into the world of Logstash, exploring its installation process, data parsing capabilities, and data enrichment techniques.

You can download the Logstash package from here https://www.elastic.co/downloads/logstash

Or you can download it from the Package repository more details here https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

A Logstash pipeline comprises three integral components: input, filter, and output. Input plugins are responsible for ingesting data from a designated source, while filter plugins facilitate data manipulation according to your specifications. Lastly, output plugins serve to channel the processed data toward a chosen destination for storage or further utilization.

Let’s start with stashing our first event:

got to your Logstash folder

cd logstash-8.9.1
bin/logstash -e 'input { stdin { } } output { stdout {} }'

now type whatever you want, logstash will print it out with other details

hello
{
"host" => {
"hostname" => "Mohameds-MacBook-Pro.local"
},
"@timestamp" => 2023-08-28T20:15:23.247430Z,
"message" => "hello",
"event" => {
"original" => "hello"
},
"@version" => "1"
}

Let’s create our first pipeline and call it first_pipeline.yml, the logstash pipeline as you know it contains: input, filter, output.

you can run one or multiple pipelines in same time

input { 
stdin { }
}
filter {
}
output {
stdout {}
}

to run the pipeline you use this command:

 bin/logstash -f config/first_config.conf

No we have an example of a Firewall log,

2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block

now lets modify our first pipeline and replace the input with the following, we have changed the input to read from this file, you can change it to point where the your firewall logs are(or any other log file you want)

input { 
file {
path => "/Users/mohamed/Documents/Training/firewall.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

output {
stdout {}
}



the result should be like this:

{
"message" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block",
"@version" => "1",
"@timestamp" => 2023-08-28T20:30:52.149616Z,
"host" => {
"name" => "Mohameds-MacBook-Pro.local"
},
"event" => {
"original" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block"
},
"log" => {
"file" => {
"path" => "/Users/mohamed/Documents/Training/firewall.log"
}
}
}

This is the log as you can see below, The log is coming in one field, which is the message field, It’s good so far, but better to parse the log and split it into different fields like timestamp, source.ip, destination.ip,source.port, destination.port.., to use later for your search, aggregation, detection, and so on, and also to be able to use the built-in detection and dashboards and other Elasticsearch capabilities.

We want to parse the log like this:

  • timestamp: 2023–08–18 20:10:15
  • source.ip: 172.16.42.105
  • destination.ip: 54.239.25.200
  • transport.protocol: UDP
  • source.port: 51342
  • destination.port: 53
  • event.action: Blocked
  • rule.name: Inbound_DNS_Block

Note: When it comes to naming the field it better to follow Elastic Common Schema: https://www.elastic.co/guide/en/ecs/current/index.html

Now you can start creating and testing your parser directly in Logstash, or there’s another tool in Kibana that you can use to create and test your parser, Another point this tool works only with Grok Parser.

In our example we use grok parser but there’s more that you can use depending on the use case, Here’s the link for the official documentation for the Logstash parser https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

Here’s the Sample Data

2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block

Here’s the Grok Pattern

%{TIMESTAMP_ISO8601:@timestamp} \[Firewall\] SRC: %{IP:source.ip}, DST: %{IP:destination.ip}, PROTO: %{WORD:network.transport}, SPORT: %{NUMBER:source.port}, DPORT: %{NUMBER:destination.port}, ACTION: %{WORD:event.action}, RULE: %{WORD:rule.name}

this is the output from the Grok Debbuger

{
"@timestamp": "2023-08-18 20:10:15",
"destination": {
"port": "53",
"ip": "54.239.25.200"
},
"rule": {
"name": "Inbound_DNS_Block"
},
"source": {
"port": "51342",
"ip": "172.16.42.105"
},
"event": {
"action": "Blocked"
},
"network": {
"transport": "UDP"
}
}

Once you configure your grok parser, you can now copy it to Logstash and run it here’s the Logstash configuration, the final logstash pipeline will be like this:

input { 
file {
path => "/Users/mohamed/Documents/Training/firewall.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:@timestamp} \[Firewall\] SRC: %{IP:source.ip}, DST: %{IP:destination.ip}, PROTO: %{WORD:network.transport}, SPORT: %{NUMBER:source.port}, DPORT: %{NUMBER:destination.port}, ACTION: %{WORD:event.action}, RULE: %{WORD:rule.name}"
}
}
} output {
stdout {}
}

now you run the logstash, you will have the following result:

{
"source.ip" => "172.16.42.105",
"event.action" => "Blocked",
"log" => {
"file" => {
"path" => "/Users/mohamed/Documents/Training/firewall.log"
}
},
"network.transport" => "UDP",
"@timestamp" => 2023-08-28T19:54:06.822633Z,
"event" => {
"original" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block"
},
"source.port" => "51342",
"destination.port" => "53",
"message" => "2023-08-18 20:10:15 [Firewall] SRC: 172.16.42.105, DST: 54.239.25.200, PROTO: UDP, SPORT: 51342, DPORT: 53, ACTION: Blocked, RULE: Inbound_DNS_Block",
"@version" => "1",
"destination.ip" => "54.239.25.200",
"host" => {
"name" => "Mohameds-MacBook-Pro.local"
},
"rule.name" => "Inbound_DNS_Block"
}

Once it works, now you can change the output to point to your Elasticsearh cluster.

In the end, I encourage you to explore further and experiment with Logstash’s advanced features!

Thanks for reading ^^

--

--

Saidani Mohamed El Amine

Currently working as DevSecOps consultant with focus on security, monitoring, Big Data, and related topics.