Multiple Index Routing Using Fluentd/Logstash
Posted on July 8, 2020

One common use case when sending logs to Elasticsearch is to send different lines of the log file to different indexes based on matching patterns. In this article, we will go through the process of setting this up using both Fluentd and Logstash in order to give you more flexibility and ideas on how to approach the topic.
Additionally, we’ll also make use of grok patterns and go through examples so you can be confident that after reading it you can replicate this setup to suit your needs.
How to get started with Kubernetes clusters
Firstly, you need to have access to a working Kubernetes cluster. Then, you can deploy your own on your laptop/PC by using the myriad of tools that are available right now. We will just name a few:
- Docker for Mac/Windows
- Minikube
- MicroK8s
- K3s
Tutorials on how to set up your Kubernetes cluster can be found all over the internet. An example on how you can set up your cluster using Docker for Mac can be found here.
How to prepare the Index Routing
Secondly, you need to have Kubectl and Helm v3 installed to use the resources posted on our GitHub repo dedicated to this blogpost.
Next, you can clone the repository run the following command:
$ git clone https://github.com/cloud-hero/multiple-index-routing.git
Inside, there are 2 folders:
- spitlogs (this contains the Dockerfile and resources to build the spitlogs image if you don’t intend on using the one hosted on our Docker Hub repository)
- helm-values (this contains all the necessary Helm values files to deploy our logging stack)
Eventually, in order to generate logs, we need to run the spitlogs container in its own namespace:
$ kubectl create namespace spitlogs
$ kubectl -n spitlogs run spitlogs --image=cloudhero/spitlogs
As you can see, it’s a simple bash script that outputs the same 4 Nginx logs over and over again. More specifically, 2 access and 2 error logs with the timestamp removed so as to not mess with the timestamp created by fluentd or logstash:
[error] 7359#7359: *17367 upstream timed out (110: Connection timed out) while connecting to upstream, client: 1.1.1.1, server: 0.0.0.0:443, upstream: "2.2.2.2:443", bytes from/to client:0/0, bytes from/to upstream:0/0
1.1.1.1 - - "GET / HTTP/1.1" 200 37427 "https://www.test.com/" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36" "3.3.3.3,4.4.4.4" "FyqZnlcMTwjKj8LfT2JwjF3HHs" "0.187" "0.188"
[error] 7359#7359: *17367 no live upstreams while connecting to upstream, client: 5.5.5.5, server: 0.0.0.0:443, upstream: "api.fake.io", bytes from/to client:0/0, bytes from/to upstream:0/0
1.1.1.1 - - "GET /testpage2 HTTP/1.1" 200 47002 "https://www.fakepage.com/" "Mozilla/5.0 (Windows NT 6.1; rv:77.0) Gecko/20100101 Firefox/77.0" "6.6.6.6,7.7.7.7" "tEhE9doovS3TCfW2tSwf4aN6IU" "0.833" "0.836"
Then, we want to send error logs to an Elasticsearch index and the access logs to another index, while having them both correctly parsed.
Before deploying, we need to create a new namespace and add some Helm repositories:
$ kubectl create namespace logging
$ helm repo add stable https://kubernetes-charts.storage.googleapis.com
$ helm repo add elastic https://helm.elastic.co
$ helm repo add kiwigrid https://kiwigrid.github.io
Deoploying Elasticsearch
Next, let’s deploy Elasticsearch and Kibana using e values files provided in the multiple-index-routing repository.
$ cd multiple-index-routing
$ helm install elasticsearch elastic/elasticsearch --namespace logging -f helm-values/es-values.yaml
$ helm install kibana elastic/kibana --namespace logging -f helm-values/kibana-values.yaml
We advise you to check that the setup is okay. To enable it, forward the 5601 port on the Kibana container:
$ kubectl -n logging port-forward $(kubectl -n logging get pods -l app=kibana -oname) 5601
and open http://localhost:5601 in your browser.
Last but not least, we need to tail our application logs, which requires deploying an agent that can harvest them from disk and send them to the appropriate aggregator. Consequently, rest of this tutorial will be split into two parts, one focusing on fluentbit/fluentd, and the other on filebeat/logstash.
1. Fluentbit/Fluentd for Index Setup
Let’s take a look at how we can achieve the above task using the aforementioned technologies.
We start by configuring Fluentd. Custom plugins are required in this case, namely fluent-plugin-grok-parser and fluent-plugin-rewrite-tag-filter, thus we created a custom image that we pushed on our Docker Hub. As previously recommended, if you want to build the image on your own and push it to your own registry, you can do it by using this simple Dockerfile:
FROM quay.io/fluentd_elasticsearch/fluentd
RUN fluent-gem install fluent-plugin-rewrite-tag-filter fluent-plugin-grok-parser
We will proceed with installing Fluentd using the values file provided in our repository:
$ helm install fluentd kiwigrid/fluentd-elasticsearch --namespace logging -f helm-values/fluentd-values.yaml
Now let’s have a look at the configuration file:
<source>
@type forward
tag fakelogs
port 24224
bind 0.0.0.0
</source>
<filter fakelogs>
@type parser
key_name log
<parse>
@type grok
<grok>
pattern %{IPORHOST:remote_ip} - %{DATA:user_name} "%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:agent}" "(?:%{IPORHOST:real_ip},%{IPORHOST:internal_ip}|-)" "%{DATA:frontend_cookie}" "%{DATA:request_time}" "%{DATA:upstream_response_time}"
</grok>
<grok>
pattern \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER:threadid}\: \*%{NUMBER:connectionid} %{GREEDYDATA:error_msg}, client: %{IP:client}, server: %{GREEDYDATA:server}, upstream: "%{GREEDYDATA:upstream}", bytes from/to client:%{GREEDYDATA:bytes_ft_client}, bytes from/to upstream:%{GREEDYDATA:bytes_ft_upstream}
</grok>
</parse>
</filter>
<match fakelogs>
@type rewrite_tag_filter
<rule>
key severity
pattern /.*/
tag error_log
</rule>
<rule>
key response_code
pattern /.*/
tag access_log
</rule>
</match>
<match error_log>
@type elasticsearch
hosts elasticsearch-master:9200
logstash_format true
logstash_prefix fd-error
type_name log
</match>
<match access_log>
@type elasticsearch
hosts elasticsearch-master:9200
logstash_format true
logstash_prefix fd-access
type_name log
</match>
How to read the Fluentd configuration file
The first block we shall have a look at is the <source> block. It specifies that fluentd is listening on port 24224 for incoming connections and tags everything that comes there with the tag fakelogs. For this reason, tagging is important because we want to apply certain actions only to a certain subset of logs.
The <filter> block takes every log line and parses it with those two grok patterns. If the first one fails, it tries the second one, and if the second one fails, the log line remains unparsed. You can have N grok patterns and your parsing will stop at the pattern that was a successful match.
After the <filter> block, we have our first <match> block which makes use of the rewrite_tag_filter plugin. This plugin uses regex patterns to check if a field from the parsed log line matches something specific. In our case, we only check if that field exists. For example, an access log will surely not have a severity field, because it is not even mentioned in the grok pattern. Moreover, after determining the types of logs, we replace the old fakelogs tag, with a new one, either error_log or access_log. This action helps us make decisions down the line about what to do with them.
The magic happens in the last 2 <match> blocks, because depending on which tag the log line has assigned, it is either sent to the fd-access-* index, or the fd-error-* one.
Sending logs to fluentd
Now that we have our fluentd agent configured and running, we must use something to send logs to it. Coming up next, we will install the fluentbit agent:
$ helm install fluentbit stable/fluent-bit --namespace logging -f helm-values/fluentbit-values.yaml
Let’s take a quick look at the values file for fluentbit too. The only line which needs explaining is this one:
path: /var/log/containers/spitlogs*_spitlogs_spitlogs-*.log
As stated in our previous article regarding fluentbit, Kubernetes stores logs on disk using the <deployment_name>*_<namespace>_<container>-*.log
format, so we make use of that fact to only target the logs files from our spitlogs application.
After installing fluentbit, we should see some action in Kibana.
Firstly, let’s create a new Elasticsearch index pattern in Kibana:

You can see that Kibana can find the fd-access-* and fd-error-* indices in Elasticsearch:

To create the fd-access-* index pattern, write exactly that in place of index-name-* and click Next step.

In the next step, choose @timestamp as the timestamp, and finally, click Create index pattern. Repeat the same steps for the fd-error-* index pattern as well.
After this, we can go to the Discover tab and see that we have two index patterns created with parsed logs inside them.


2. Logstash/Filebeat for Index Routing
If Fluentbit/Fluentd does not suit your needs, the alternative solution for Multiple Index Routing using Logstash and Filebeat.
Let’s install Logstash using the values file provided in our repository
$ helm install logstash elastic/logstash --namespace logging -f helm-values/logstash-values.yaml
and have a look at the configuration file:
input {
beats {
port => 5044
host => "0.0.0.0"
tags => ["fakelogs"]
}
}
filter {
if "fakelogs" in [tags] {
grok {
match => { "message" => "%{IPORHOST:remote_ip} - %{DATA:user_name} \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"(?:%{IPORHOST:real_ip},%{IPORHOST:internal_ip}|-)\" \"%{DATA:frontend_cookie}\" \"%{DATA:request_time}\" \"%{DATA:upstream_response_time}\"" }
add_tag => ["access_log"]
}
if "_grokparsefailure" in [tags] {
grok {
match => { "message" => "\[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER:threadid}\: \*%{NUMBER:connectionid} %{GREEDYDATA:error_msg}, client: %{IP:client}, server: %{GREEDYDATA:server}, upstream: \"%{GREEDYDATA:upstream}\", bytes from/to client:%{GREEDYDATA:bytes_ft_client}, bytes from/to upstream:%{GREEDYDATA:bytes_ft_upstream}" }
add_tag => ["error_log"]
remove_tag => ["_grokparsefailure"]
}
}
}
}
filter {
if "access_log" in [tags] {
mutate { add_field => { "[@metadata][target_index]" => "ls-access-%{+yyyy.MM.dd}" } }
}
else {
mutate { add_field => { "[@metadata][target_index]" => "ls-error-%{+yyyy.MM.dd}" } }
}
}
output {
elasticsearch {
hosts => "elasticsearch-master:9200"
index => "%{[@metadata][target_index]}"
}
}
The input {} block is analogous to the <source> block in fluentd, and does the same thing here, but it listens on a different port.
How to read the Logstash configuration file
The first filter {} block first tries to parse the log line with the access log grok pattern. Therefore, if a log line is not matched with a grok pattern, logstash adds a _grokparsefailure tag in the tag array, so we can check for it and parse again if the first try was unsuccessful. Additionally, we use the same tags as in fluentd, and remove the previously assigned _grokparsefailure tag.
The second filter {} block looks in the tag list and assigns a different value to the target_index metadata field.
Lastly, the output {} block uses the target_index metadata field to select the Elasticsearch index to which to send data.
If we have a look in Kibana, we can see that we have two new indexes created in Elasticsearch.

Now we’re taking the last step towards finishing our Multiple Index Routing. Take a look at the index pattern creation—it’s the same as before. Then, after creating them, we can see that the logs go to the correct indexes and are being parsed accordingly.


You reached the end of this article hoping that you feel confident enough to adapt these tips to your use case.