Multiple Index Routing Using Fluentd/Logstash

Posted on July 8, 2020

One common use case when sending logs to Elasticsearch is to send different lines of the log file to different indexes based on matching patterns. In this article, we will go through the process of setting this up using both Fluentd and Logstash in order to give you more flexibility and ideas on how to approach the topic.

Additionally, we’ll also make use of grok patterns and go through examples so you can be confident that after reading it you can replicate this setup to suit your needs.

How to get started with Kubernetes clusters

Firstly, you need to have access to a working Kubernetes cluster. Then, you can deploy your own on your laptop/PC by using the myriad of tools that are available right now. We will just name a few:

Tutorials on how to set up your Kubernetes cluster can be found all over the internet. An example on how you can set up your cluster using Docker for Mac can be found here.

How to prepare the Index Routing

Secondly, you need to have Kubectl and Helm v3 installed to use the resources posted on our GitHub repo dedicated to this blogpost.

Next, you can clone the repository run the following command:

$ git clone

Inside, there are 2 folders:

Eventually, in order to generate logs, we need to run the spitlogs container in its own namespace:

$ kubectl create namespace spitlogs
$ kubectl -n spitlogs run spitlogs --image=cloudhero/spitlogs

As you can see, it’s a simple bash script that outputs the same 4 Nginx logs over and over again. More specifically, 2 access and 2 error logs with the timestamp removed so as to not mess with the timestamp created by fluentd or logstash:

[error] 7359#7359: *17367 upstream timed out (110: Connection timed out) while connecting to upstream, client:, server:, upstream: "", bytes from/to client:0/0, bytes from/to upstream:0/0 - - "GET / HTTP/1.1" 200 37427 "" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36" "," "FyqZnlcMTwjKj8LfT2JwjF3HHs" "0.187" "0.188"

[error] 7359#7359: *17367 no live upstreams while connecting to upstream, client:, server:, upstream: "", bytes from/to client:0/0, bytes from/to upstream:0/0 - - "GET /testpage2 HTTP/1.1" 200 47002 "" "Mozilla/5.0 (Windows NT 6.1; rv:77.0) Gecko/20100101 Firefox/77.0" "," "tEhE9doovS3TCfW2tSwf4aN6IU" "0.833" "0.836"

Then, we want to send error logs to an Elasticsearch index and the access logs to another index, while having them both correctly parsed.

Before deploying, we need to create a new namespace and add some Helm repositories:

$ kubectl create namespace logging
$ helm repo add stable
$ helm repo add elastic
$ helm repo add kiwigrid

Deoploying Elasticsearch

Next, let’s deploy Elasticsearch and Kibana using e values files provided in the multiple-index-routing repository.

$ cd multiple-index-routing
$ helm install elasticsearch elastic/elasticsearch --namespace logging -f helm-values/es-values.yaml
$ helm install kibana elastic/kibana --namespace logging -f helm-values/kibana-values.yaml

We advise you to check that the setup is okay. To enable it, forward the 5601 port on the Kibana container:

$ kubectl -n logging port-forward $(kubectl -n logging get pods -l app=kibana -oname) 5601

and open http://localhost:5601 in your browser.

Last but not least, we need to tail our application logs, which requires deploying an agent that can harvest them from disk and send them to the appropriate aggregator. Consequently, rest of this tutorial will be split into two parts, one focusing on fluentbit/fluentd, and the other on filebeat/logstash.

1. Fluentbit/Fluentd for Index Setup

Let’s take a look at how we can achieve the above task using the aforementioned technologies.

We start by configuring Fluentd. Custom plugins are required in this case, namely fluent-plugin-grok-parser and fluent-plugin-rewrite-tag-filter, thus we created a custom image that we pushed on our Docker Hub. As previously recommended, if you want to build the image on your own and push it to your own registry, you can do it by using this simple Dockerfile:


RUN fluent-gem install fluent-plugin-rewrite-tag-filter fluent-plugin-grok-parser

We will proceed with installing Fluentd using the values file provided in our repository:

$ helm install fluentd kiwigrid/fluentd-elasticsearch --namespace logging -f helm-values/fluentd-values.yaml

Now let’s have a look at the configuration file:

  @type forward
  tag fakelogs
  port 24224

<filter fakelogs>
  @type parser
  key_name log
    @type grok
        pattern %{IPORHOST:remote_ip} - %{DATA:user_name} "%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:agent}" "(?:%{IPORHOST:real_ip},%{IPORHOST:internal_ip}|-)" "%{DATA:frontend_cookie}" "%{DATA:request_time}" "%{DATA:upstream_response_time}"
        pattern \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER:threadid}\: \*%{NUMBER:connectionid} %{GREEDYDATA:error_msg}, client: %{IP:client}, server: %{GREEDYDATA:server}, upstream: "%{GREEDYDATA:upstream}", bytes from/to client:%{GREEDYDATA:bytes_ft_client}, bytes from/to upstream:%{GREEDYDATA:bytes_ft_upstream}
<match fakelogs>
  @type rewrite_tag_filter
    key severity
    pattern /.*/
    tag error_log
    key response_code
    pattern /.*/
    tag access_log

<match error_log>
  @type elasticsearch
  hosts elasticsearch-master:9200
  logstash_format true
  logstash_prefix fd-error
  type_name log

<match access_log>
  @type elasticsearch
  hosts elasticsearch-master:9200
  logstash_format true
  logstash_prefix fd-access
  type_name log

How to read the Fluentd configuration file

The first block we shall have a look at is the <source> block. It specifies that fluentd is listening on port 24224 for incoming connections and tags everything that comes there with the tag fakelogs. For this reason, tagging is important because we want to apply certain actions only to a certain subset of logs.

The <filter> block takes every log line and parses it with those two grok patterns. If the first one fails, it tries the second one, and if the second one fails, the log line remains unparsed. You can have N grok patterns and your parsing will stop at the pattern that was a successful match.

After the <filter> block, we have our first <match> block which makes use of the rewrite_tag_filter plugin. This plugin uses regex patterns to check if a field from the parsed log line matches something specific. In our case, we only check if that field exists. For example, an access log will surely not have a severity field, because it is not even mentioned in the grok pattern. Moreover, after determining the types of logs, we replace the old fakelogs tag, with a new one, either error_log or access_log. This action helps us make decisions down the line about what to do with them.

The magic happens in the last 2 <match> blocks, because depending on which tag the log line has assigned, it is either sent to the fd-access-* index, or the fd-error-* one.

Sending logs to fluentd

Now that we have our fluentd agent configured and running, we must use something to send logs to it. Coming up next, we will install the fluentbit agent:

$ helm install fluentbit stable/fluent-bit --namespace logging -f helm-values/fluentbit-values.yaml

Let’s take a quick look at the values file for fluentbit too. The only line which needs explaining is this one:

path: /var/log/containers/spitlogs*_spitlogs_spitlogs-*.log

As stated in our previous article regarding fluentbit, Kubernetes stores logs on disk using the <deployment_name>*_<namespace>_<container>-*.log format, so we make use of that fact to only target the logs files from our spitlogs application.

After installing fluentbit, we should see some action in Kibana.

Firstly, let’s create a new Elasticsearch index pattern in Kibana:

You can see that Kibana can find the fd-access-* and fd-error-* indices in Elasticsearch:

To create the fd-access-* index pattern, write exactly that in place of index-name-* and click Next step.

In the next step, choose @timestamp as the timestamp, and finally, click Create index pattern. Repeat the same steps for the fd-error-* index pattern as well.

After this, we can go to the Discover tab and see that we have two index patterns created with parsed logs inside them.

2. Logstash/Filebeat for Index Routing

If Fluentbit/Fluentd does not suit your needs, the alternative solution for Multiple Index Routing using Logstash and Filebeat.

Let’s install Logstash using the values file provided in our repository

$ helm install logstash elastic/logstash --namespace logging -f helm-values/logstash-values.yaml

and have a look at the configuration file:

input {
  beats {
    port => 5044
    host => ""
    tags => ["fakelogs"]
filter {
  if "fakelogs" in [tags] {
    grok {
        match => { "message" => "%{IPORHOST:remote_ip} - %{DATA:user_name} \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"(?:%{IPORHOST:real_ip},%{IPORHOST:internal_ip}|-)\" \"%{DATA:frontend_cookie}\" \"%{DATA:request_time}\" \"%{DATA:upstream_response_time}\"" }
        add_tag => ["access_log"]
    if "_grokparsefailure" in [tags] {
      grok {
          match => { "message" => "\[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER:threadid}\: \*%{NUMBER:connectionid} %{GREEDYDATA:error_msg}, client: %{IP:client}, server: %{GREEDYDATA:server}, upstream: \"%{GREEDYDATA:upstream}\", bytes from/to client:%{GREEDYDATA:bytes_ft_client}, bytes from/to upstream:%{GREEDYDATA:bytes_ft_upstream}" }
          add_tag => ["error_log"]
          remove_tag => ["_grokparsefailure"]
filter {
  if "access_log" in [tags] {
    mutate { add_field => { "[@metadata][target_index]" => "ls-access-%{+yyyy.MM.dd}" } }
  else {
    mutate { add_field => { "[@metadata][target_index]" => "ls-error-%{+yyyy.MM.dd}" } }
output {
  elasticsearch {
    hosts => "elasticsearch-master:9200"
    index => "%{[@metadata][target_index]}"

The input {} block is analogous to the <source> block in fluentd, and does the same thing here, but it listens on a different port.

How to read the Logstash configuration file

The first filter {} block first tries to parse the log line with the access log grok pattern. Therefore, if a log line is not matched with a grok pattern, logstash adds a _grokparsefailure tag in the tag array, so we can check for it and parse again if the first try was unsuccessful. Additionally, we use the same tags as in fluentd, and remove the previously assigned _grokparsefailure tag.

The second filter {} block looks in the tag list and assigns a different value to the target_index metadata field.

Lastly, the output {} block uses the target_index metadata field to select the Elasticsearch index to which to send data.

If we have a look in Kibana, we can see that we have two new indexes created in Elasticsearch.

Now we’re taking the last step towards finishing our Multiple Index Routing. Take a look at the index pattern creation—it’s the same as before. Then, after creating them, we can see that the logs go to the correct indexes and are being parsed accordingly.

Pattern creation

You reached the end of this article hoping that you feel confident enough to adapt these tips to your use case.

Keep Reading

Take Advantage of the Cloud

Schedule a Call