Monitoring Dynafed with ELK
The Dynamic Federation project (DynaFed) being developed at CERN is intended to federate any number of different types of storage endpoints allowing users to read or write data transparently and efficiently. How it works in a nutshell:
One thing to notice is that both the error log and access log (the first "CustomLog" line) are sent to the same file "dynafed_log". This is in order to avoid a race conditions while shipping these logs as will be explained a bit. A critical field requested here is the "LogID". This correlates any access log with its error log(s) which is needed to reconstruct the entire request event as the error log is where the provided redirect link to the client and credentials used are found.
Filebeat is capable of shipping logs from two different files (the access log and the error log for example) and ship them to Logstash and the whole process works the same in ideal circumstances. However it is possible that after a disruption of service with the connection link between Filebeat and Logstash that a large set of events might have been logged. Filebeat will then spend more time working on shipping the contents of just one file. If the time taken before working on the second file is greater than the aggregation time limit in Logstash, then access and error logs will fail to aggregate. So to avoid this race condition we log everything to a single file.
We configure Filebeat to add the tags "dynafed", "dynafed_httpd" and "dynafed_httpd_log" which will be used by Logstash and Elasticsearch later, as well as adding time zone information. A sample configuration looks like this:
Now we need to configure Logstash to process the data and create an elasticsearch document for each event of interest, as well as discarding the ones we don't care about.
The first thing is to create "GROK" patterns so that it can parse and extract the fields from the Apache logs, which we save under /etc/logstash/conf.d/patterns/ugr.grok:
Using the "UGR_DYNAFED" pattern in our grok filter, Logstash will automatically match the log and extract the fields accordingly. It will look like this:
This will generate one elasticsearch JSON document per log line. It will also add failure tags if there are errors parsing the fields, which we use to catch and send to a different elasticsearch index for later debugging.
Thus the next step is to aggregate all the events relating to the same event into one document. To do so, we use the "aggregate" filter on the "logid" field, as this will be unique amongst the related documents. Then we have to do some ruby scripting to create and organise the new document. we also sort the timestamps to make sure we get the first event's as the main one for the document. We also create a nested object field called "events" which contain the information for each original event. Note that we also handle the cases in which there is no "logid" in case these happen by creating an "event_id" for them. All this with filter file 22-dynafed-agg-1.conf:
Next this filter figures out for each aggregated event what kind of transaction was requested. Since events can sometimes be composed of several requests depending on the client (a PROFPIND followed by GET for instance). We need to extract ultimately what was the intention of the event. For each request we check what HTTP status code was replied by the server and using a simple logical method of counting successes and failures, we figure out if it was a COPY, GET, PUT, DELETE (which we call transaction events) or HEAD, PROPFIND, etc, and mark them as such. We also whether the serving the redirection link was successful or not.
The code also aggregates all of the different requests' logs under the the "events" field, which in ES terminology is a nested field array of objects so that we can see all information of what transpired for each event.
This table shows how this calculations work:
And this is the code that figures this all out, using the "ruby" filter as demonstrated in 26-dynafed-events.conf:
Then we enrich the data by resolving DNS, adding GeoIP, set the event's timestamp as the main, parse and organize the data into four groups: Client, (client), Source (src), Destination (dst), Dynafed (dynafed) with filter file 28-dynafed-format.conf:
The script we use to obtain the connection and storage stats for each endpoint is called getugrpluginstats.py. It looks for both the connection and storage stats in memcache and creates and outputs a single string with all the information for each of the storage endpoints.
The script is placed in "/etc/execbeat/commands/getugrpluginstats.py". We configure Execbeat with using these general options. Note the tags again, which will be used by Logstash later in processing:
Now in Logstash we add the following GROK to our ugr.grok file to deal with the string in the "exec.stdout" field:
We use the following filter to Extract the information:
After this, Logstash break the string into the individual metrics and adds them into their own fields, adds the dynafed.hostname fields, adds GeoIP and translates some status codes into keywords the filter file 30-dynafed-ugr-sestatus.conf:
This allows us to access these stats from http://localhost:8088/server-status and therefore configure Metricbeat to gather these metrics with the following general options fort the Apache module. Again note the tags added which will be used once more by Logstash to process this accordingly:
In Logstash we add the dynafed.hostname field, GeoIP information and calculate the ratio of used/available workers into it's own float field with the filter file 40-dynafed-metrics-httpd.conf:
A few things to note here, one is the "_doc" line, this is added to for future compatibility with ES 7.x in case we upgrade later, and the different indices Logstash will create:
First we start with the "base" template for all Dynafed related indices which defines the "dynafed" group and sets all string fields to be of the "keyword" data-type by default with template 10-dynafed-base-template.json
Then we define the field mappings for each of the indices. For dynafed-logs-httpd we use 20-dynafed-logs-httpd-mappings-template.json:
And 30-dynafed-logs-httpd-aliases-template.json:
For dynafed-metrics-httpd-* we use 20-dynafed-metrics-httpd-mappings-template.json:
And 30-dynafed-metrics-httpd-aliases-template.json:
For dynafed-metrics-ugr-* we use 20-dynafed-metrics-ugr-mappings-template.json:
And 30-dynafed-metrics-ugr-aliases-template.json:
Of course the alias templates could be added instead in the other templates, but we like to keep all settings separate. We use others as well for general index settings like replicas and shards as you can see here (only the ones with "dynafed" in the name are related to this project).
- A client sends a COPY, GET, PUT, DELETE HTTP request to the DynaFed instance's URL.
- DynaFed decides which amongst the storage endpoints it federates is the "best" one to handle the request, sorted according to geographic location and available free space.
- The client receives a 302 redirect link pointing to this storage endpoint with the protocol and authentication tokens necessary for the transaction.
- The client re-sends the HTTP request directly to this storage endpoint.
- Apache Status.
- Client authentication credentials.
- Most popular files.
- Most popular storage endpoints.
- Most popular clients.
- Storage endpoint accessibility.
- Storage endpoint capacity.
- Storage endpoint link health.
- Success/Failure in serving the redirect links (400's and 500's status codes)
- Filebeat: periodically parses Apache's log files and ships them to Logstash.
- Metricbeat: periodically reads Apache "/server-status" page and sends the metrics to Logstash.
- Execbeat: periodically reads entries in memcache with storage entpoint status information and sends them to Logstash.
- Logstash: the bulk of the processing happens here. It aggregates together all the events that share the same LogID within a specified time limit. Extracts the data fields, enriches them (such as DNS resolution and GeoIP location) and creates a JSON file with the entire event which is then shipped Elasticsearch
- Elasticsearch: the database that holds the information for later accounting, analysis, monitoring and visualizing.
Apache Logs
Apache has two default log files access.log and error.log in which the details of each request are written into. Unfortunately the default settings do not give us all the information we want, so these need to be customised in our httpd.conf (or vhost file where the DynaFed virtual server is configured) as such:LogLevel    warn
LogLevel    lcgdm_ns:info
ErrorLogFormat "[%-{cu}t] [LogID \"%-L\"] [thread \"%-T\"] [client \"%-a\"] [agent \"%-{User-Agent}i\"] [%-M]"
ErrorLog    logs/dynafed_log
CustomLog   logs/dynafed_log "[%{%Y-%m-%d %H:%M:%S}t.%{begin:usec_frac}t] [LogID \"%L\"] [thread %{tid}P] [client %h:%{remote}p] [request \"%r\"] [method %m] [content-length %{Content-Length}i] [query \"%q\"] [urlpath \"%U\"] [status %>s] [agent \"%{User-Agent}i\"]" env=!dontlog
CustomLog   logs/dynafed_ssl_request_log "%t %h %{SSL_PROTOCOL}x %{SSL_CIPHER}x \"%r\" %b"
One thing to notice is that both the error log and access log (the first "CustomLog" line) are sent to the same file "dynafed_log". This is in order to avoid a race conditions while shipping these logs as will be explained a bit. A critical field requested here is the "LogID". This correlates any access log with its error log(s) which is needed to reconstruct the entire request event as the error log is where the provided redirect link to the client and credentials used are found.
Filebeat is capable of shipping logs from two different files (the access log and the error log for example) and ship them to Logstash and the whole process works the same in ideal circumstances. However it is possible that after a disruption of service with the connection link between Filebeat and Logstash that a large set of events might have been logged. Filebeat will then spend more time working on shipping the contents of just one file. If the time taken before working on the second file is greater than the aggregation time limit in Logstash, then access and error logs will fail to aggregate. So to avoid this race condition we log everything to a single file.
We configure Filebeat to add the tags "dynafed", "dynafed_httpd" and "dynafed_httpd_log" which will be used by Logstash and Elasticsearch later, as well as adding time zone information. A sample configuration looks like this:
## General Options ###
name: "{hostname}"
tags: "dynafed"
filebeat:
  inputs:
    - type: "log"
      tags: ["dynafed_httpd_log", "dynafed_httpd"]
      paths:
        - "/var/log/httpd/dynafed_log"
##### Add Time Zone Information #####
processors:
  - add_locale: ~
### Outputs ###
output:
  logstash:
    hosts:
      - "logstash-server:5044"
Now we need to configure Logstash to process the data and create an elasticsearch document for each event of interest, as well as discarding the ones we don't care about.
The first thing is to create "GROK" patterns so that it can parse and extract the fields from the Apache logs, which we save under /etc/logstash/conf.d/patterns/ugr.grok:
UGR_HTTPD_ACCESSLOG \[%{TIMESTAMP_ISO8601:timestamp}] \[LogID \"%{DATA:logid}\"] \[thread %{POSINT:thread}] \[client %{IPORHOST:clienthostname}:%{POSINT:clientport}] \[request \"%{DATA:request}\"] \[method (-|%{WORD:method})] \[content-length (-|%{POSINT:filesize})] \[query \"%{DATA:query}\"] \[urlpath \"%{DATA:df_lfn}\"] \[status %{NUMBER:statuscode}] \[agent \"%{DATA:agent}\"]
UGR_HTTPD_ERRORLOG_MESSAGE (=.*?; %{URI:storage_endpoint}%{SPACE}\(\)%{SPACE}\[%{INT:statuscode},%{SPACE}\#%{INT}]|Using DN: %{GREEDYDATA:clientdn}|Using FQAN: %{GREEDYDATA:clientfqan}|%{GREEDYDATA:df_message})
UGR_HTTPD_ERRORLOG \[%{TIMESTAMP_ISO8601:timestamp}]%{SPACE}\[LogID \"%{DATA:logid}\"]%{SPACE}\[thread \"%{POSINT:thread}\"]%{SPACE}\[client \"%{IPORHOST:clienthostname}:%{POSINT:clientport}\"]%{SPACE}\[agent \"%{DATA:agent}\"]%{SPACE}\[%{UGR_HTTPD_ERRORLOG_MESSAGE}]
UGR_FILENAME (?:[^/]*$)
UGR_DYNAFED %{UGR_HTTPD_ACCESSLOG}|%{UGR_HTTPD_ERRORLOG}
UGR_HTTPD_COPY_PULL Trying remote fetch for %{URIPATH}, The source is %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?%{IPORHOST:src_hostname}(?::%{POSINT})?(?:%{URIPATH:src_lfn}(?:%{URIPARAM:src_param})?)?
UGR_HTTPD_COPY_PUSH Trying remote copy for %{URIPATH}, The destination is %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?%{IPORHOST:dst_hostname}(?::%{POSINT})?(?:%{URIPATH:dst_lfn}(?:%{URIPARAM:dst_param})?)?
UGR_HTTPD_COPY %{UGR_HTTPD_COPY_PULL}|%{UGR_HTTPD_COPY_PUSH}
Using the "UGR_DYNAFED" pattern in our grok filter, Logstash will automatically match the log and extract the fields accordingly. It will look like this:
filter {
  if "dynafed_httpd_log" in [tags] {
    grok {
      patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
      match          =>  { "message" => "%{UGR_DYNAFED}" }
      remove_field   =>  [ "message" ]
      tag_on_failure =>  [ "_0-httpdlog_grokparsefailure", "failure" ]
    }
}
This will generate one elasticsearch JSON document per log line. It will also add failure tags if there are errors parsing the fields, which we use to catch and send to a different elasticsearch index for later debugging.
Thus the next step is to aggregate all the events relating to the same event into one document. To do so, we use the "aggregate" filter on the "logid" field, as this will be unique amongst the related documents. Then we have to do some ruby scripting to create and organise the new document. we also sort the timestamps to make sure we get the first event's as the main one for the document. We also create a nested object field called "events" which contain the information for each original event. Note that we also handle the cases in which there is no "logid" in case these happen by creating an "event_id" for them. All this with filter file 22-dynafed-agg-1.conf:
filter {
  if "dynafed_httpd" in [tags] {
    # The lack of logid means there is no error_log counterpart to be aggregated
    # so we use the event_id to aggregate any access_log lines belonging to the
    # same event. We don't call it logid here to avoid conflict with the next
    # aggregate filter.
    if [logid] == "-" {
      # Create a unique ID to identify the event.
      fingerprint {
        concatenate_sources => true
        source  =>  ['thread', 'clienthostname', 'clientport', 'agent', 'df_lfn']
        method  =>  "MURMUR3"
        target  =>  "event_id"
      }
      aggregate {
        aggregate_maps_path  =>  "/var/log/logstash/aggregate_maps"
        task_id  =>  "%{event_id}"
        code     =>  "
          # Keep the tags
          map['tags'] ||= []
          map['tags']   = event.get('tags')
          map['beat'] ||= []
          map['beat'] = event.get('beat')
          # Keep the original timestamp and add the timezone since Apache does not
          # provide it.
          map['temptime']   ||=  event.get('timestamp') + event.get('[beat][timezone]')
          map['timestamp']  ||= []
          # Bubble-sort to obtain the earliest timestamp
          if map['timestamp'].empty? || map['timestamp'] < map['temptime']
            map['timestamp'] = map['temptime']
          end
          # Generated the nested a dict with the log line information and add it
          # as a nested object to events list field.
          map['events'] ||= []
          hash = { :agent            => event.get('agent'),
                   :clienthostname   => event.get('clienthostname'),
                   :clientport       => event.get('clientport'),
                   :method           => event.get('method'),
                   :request          => event.get('request'),
                   :statuscode       => event.get('statuscode'),
                   :timestamp        => event.get('timestamp') + event.get('[beat][timezone]'),
                   :thread           => event.get('thread')
                 }
          map['events'] << hash
          # Define the fields that will be used for accounting.
          stringfields = [
            'clienthostname',
            'df_lfn',
            'filesize',
            'storage_endpoint'
          ]
          map['host']  ||=  ''
          if event.get('[host][name]') && !map['host'].include?(event.get('[host][name]'))
            map['host']  +=  ', ' unless map['host'].empty?
            map['host']  += event.get('[host][name]')
          end
          # Obtain the data from each field defined above. Since they should be
          # the same across the log lines, each should end up with a single value
          # however if this is not the case, different values are appended after ','
          # this does not turn the field into a JSON list.
          for i in stringfields
            map[i]  ||=  '' if event.get(i)
            if event.get(i) && !map[i].include?(event.get(i))
              map[i]  +=  ', ' unless map[i].empty?
              map[i]  += event.get(i)
            end
          end
        "
        # How long to wait for new log lines form the same event in seconds.
        # Beats and Logstash services
        timeout       =>  60
        map_action    =>  "create_or_update"
        push_map_as_event_on_timeout  =>  true
        # Keep event_id as logid for consistency.
        timeout_task_id_field => "logid"
        timeout_code  =>  "
          # Add meta tag to mark event as aggregated
          event.set('[@metadata][step]', 'aggregate-last')
          "
      }
    } else {
      # Here Accesslogs with an Errorlog counterpart will be aggregated using logid.
      aggregate {
        task_id  =>  "%{logid}"
        code     =>  "
          # Keep the tags
          map['tags'] ||= []
          map['tags']   = event.get('tags')
          map['beat'] ||= []
          map['beat'] = event.get('beat')
          # Keep the original timestamp and add the timezone since Apache does not
          # provide it.
          map['temptime']   ||=  event.get('timestamp') + event.get('[beat][timezone]')
          map['timestamp']  ||= []
          # Bubble-sort to obtain the earliest timestamp
          if map['timestamp'].empty? || map['timestamp'] < map['temptime']
            map['timestamp'] = map['temptime']
          end
          # Generated the nested a dict with the log line information and add it
          # as a nested object to events list field.
          # No data from the dynafed_error_log is needed except the sotrageendpoint,
          # which is added at the end (see timeout code below).
          unless map['tags'].include?('dynafed_httpd_error')
            map['events'] ||= []
            hash = { :agent           => event.get('agent'),
                     :clientdn        => event.get('clientdn'),
                     :clientfqan      => event.get('clientfqan'),
                     :clienthostname  => event.get('clienthostname'),
                     :clientport      => event.get('clientport'),
                     :df_message      => event.get('df_message'),
                     :logid           => event.get('logid'),
                     :method          => event.get('method'),
                     :request         => event.get('request'),
                     :statuscode      => event.get('statuscode'),
                     :timestamp       => event.get('timestamp') + event.get('[beat][timezone]'),
                     :thread          => event.get('thread')
                   }
            map['events'] << hash
          end
          # Create the clienthostname field.
          map['clienthostname']  ||=  ''
          map['clienthostname']  = event.get('clienthostname')
          # Define the fields that will be used for accounting.
          stringfields = [
            'clientdn',
            'clientfqan',
            'df_message',
            'df_lfn',
            'filesize',
            'storage_endpoint'
          ]
          map['host']  ||=  ''
          if event.get('[host][name]') && !map['host'].include?(event.get('[host][name]'))
            map['host']  +=  ', ' unless map['host'].empty?
            map['host']  += event.get('[host][name]')
          end
          # Obtain the data from each field defined above. Since they should be
          # the same across the log lines, each should end up with a single value
          # however if this is not the case, different values are appended after ','
          # this does not turn the field into a JSON list.
          for i in stringfields
            map[i]  ||=  '' if event.get(i)
            if event.get(i) && !map[i].include?(event.get(i))
              map[i]  +=  ', ' unless map[i].empty?
              map[i]  += event.get(i)
            end
          end
        "
        # How long to wait for new log lines form the same event in seconds.
        # 60 is a sane number unless there is a lot of network lag between
        # Beats and Logstash services
        timeout       =>  60
        map_action    =>  "create_or_update"
        push_map_as_event_on_timeout  =>  true
        timeout_task_id_field => "logid"
        timeout_code  =>  "
          # Add meta tag to mark event as aggregated
          event.set('[@metadata][step]', 'aggregate-last')
          # Add the storage_endpoint data into each of the nested events.
          if event.get('events')
            ev = event.get('events')
            if ev[0]
              ev[0][:storage_endpoint] = event.get('storage_endpoint')
              event.set('events', ev)
            else
              event.set('[error]', 'ev0-notfound')
            end
          end
          "
      }
    }
  }
}
Next this filter figures out for each aggregated event what kind of transaction was requested. Since events can sometimes be composed of several requests depending on the client (a PROFPIND followed by GET for instance). We need to extract ultimately what was the intention of the event. For each request we check what HTTP status code was replied by the server and using a simple logical method of counting successes and failures, we figure out if it was a COPY, GET, PUT, DELETE (which we call transaction events) or HEAD, PROPFIND, etc, and mark them as such. We also whether the serving the redirection link was successful or not.
The code also aggregates all of the different requests' logs under the the "events" field, which in ES terminology is a nested field array of objects so that we can see all information of what transpired for each event.
This table shows how this calculations work:
                        |       HTTP Status Code      |
|---------------|------ |---------------------------- |
| Method        | Event | 100 | 200 | 300 | 400 | 500 |
|---------------|------ |-----|-----|-----|-----|-----|
| GET           |   +1  | +1  | +1  | +1  |  0  |  0  |
| PUT           |   +1  | +1  | +1  | +1  |  0  |  0  |
| DELETE        |   +1  | +1  | +1  | +1  |  0  |  0  |
| COPY          |   +1  | +1  | +1  | +1  |  0  |  0  |
| HEAD/PROPFIND |   0   | *1  | *1  |  ?  | *0  | *0  |
And this is how the total tally is interpreted:
                            |  Total Status   |
|-------------|-------------|-------|---------|
| Transaction | Total Event |  Fail | Success |
|-------------|-------------|-------|---------|
|    Read     |     > 0     |   0   |   > 0   |
|    Write    |     > 0     |   0   |   > 0   |
|    Copy     |     > 0     |   0   |   > 0   |
|    Delete   |     > 0     |   0   |   > 0   |
And this is the code that figures this all out, using the "ruby" filter as demonstrated in 26-dynafed-events.conf:
filter {
  if "dynafed_httpd" in [tags] {
    if "aggregate-last" in [@metadata][step] and [events] {
      ruby {
        code  =>  "
          # Variables used to count the types of methods and error codes in
          # order to find out what kind of transaction was ultimately requested
          # and whether Dynafed succeeded in sending a redirection link to the user.
          $transaction_event = 0
          $transaction_status = 0
          $copy_attempts = 0
          $delete_attempts = 0
          $get_attempts = 0
          $put_attempts = 0
          $se  =  ''
          # Sort the events by time and count how many events for the transaction
          # which is the number of elements in the 'events' field
          array = event.get('events').sort_by { |hsh| hsh['timestamp'] }
          $elements = array.count
          $i = 0
          while $i < $elements do
            case array[$i]['method']
            when /GET/
              $get_attempts += 1
              $se = array[$i]['storage_endpoint']
              $transaction_event += 1
              event.set('[transaction][statuscode]', array[$i]['statuscode'])
              # Check for HTTP status (200, 404, etc)
              case array[$i]['statuscode']
              when /1../
                $transaction_status += 1
              when /2../
                $transaction_status += 1
              when /3../
                $transaction_status += 1
              when /4../
                $transaction_status += 0
              when /5../
                $transaction_status += 0
              end
            when /PUT/
              $put_attempts += 1
              $se = array[$i]['storage_endpoint']
              $transaction_event +=  1
              event.set('[transaction][statuscode]', array[$i]['statuscode'])
              # Check for HTTP status (200, 404, etc)
              case array[$i]['statuscode']
              when /1../
                $transaction_status += 1
              when /2../
                $transaction_status += 1
              when /3../
                $transaction_status += 1
              when /4../
                $transaction_status += 0
              when /5../
                $transaction_status += 0
              end
            when /DELETE/
              $delete_attempts += 1
              $se = array[$i]['storage_endpoint']
              $transaction_event += 1
              event.set('[transaction][statuscode]', array[$i]['statuscode'])
              # Check for HTTP status (200, 404, etc)
              case array[$i]['statuscode']
              when /1../
                $transaction_status += 1
              when /2../
                $transaction_status += 1
              when /3../
                $transaction_status += 1
              when /4../
                $transaction_status += 0
              when /5../
                $transaction_status += 0
              end
            when /COPY/
              $copy_attempts += 1
              $se = array[$i]['storage_endpoint']
              $transaction_event += 1
              event.set('[transaction][statuscode]', array[$i]['statuscode'])
              # Check for HTTP status (200, 404, etc)
              case array[$i]['statuscode']
              when /1../
                $transaction_status += 1
              when /2../
                $transaction_status += 1
              when /3../
                $transaction_status += 1
              when /4../
                $transaction_status += 0
              when /5../
                $transaction_status += 0
              end
 
            end
          $i += 1
          end
          # Create new event with the infomation gathered.
          unless $se.to_s.strip.empty?
            event.set('storage_endpoint', $se )
          end
          event.set('events', array)
          event.set('[transaction][type]', $transaction_type)
          event.set('[transaction][status]', $transaction_status)
          event.set('[@metadata][transaction_event]', $transaction_event)
          event.set('[@metadata][get_attempts]', $get_attempts)
          event.set('[@metadata][put_attempts]', $put_attempts)
          event.set('[@metadata][delete_attempts]', $delete_attempts)
          event.set('[@metadata][copy_attempts]', $copy_attempts)
          # Flag if event has multiple transactional events
          # $Results = [$copy_attempts,$delete_attempts,$get_attempts,$put_attempts]
          # if $Results.sum > $Results.max
          #   event.set('[@metadata][multievent_error]', 1)
          # else
          #   event.set('[@metadata][multievent_error]', 0)
        "
      }
      if [@metadata][transaction_event] != 0 {
        if [@metadata][put_attempts] > 0 {
          mutate {
            replace => { "[transaction][type]" => "Write"}
            add_tag       =>  ['transaction_event']
            add_field    =>  { "[transaction][attempts]" => "%{[@metadata][put_attempts]}" }
          }
        } else if [@metadata][get_attempts] > 0 {
          mutate {
            replace => { "[transaction][type]" => "Read"}
            add_tag       =>  ['transaction_event']
            add_field    =>  { "[transaction][attempts]" => "%{[@metadata][get_attempts]}" }
          }
        } else if [@metadata][delete_attempts] > 0 {
          mutate {
            replace => { "[transaction][type]" => "Delete"}
            add_tag       =>  ['transaction_event']
            add_field    =>  { "[transaction][attempts]" => "%{[@metadata][delete_attempts]}" }
          }
        } else if [@metadata][copy_attempts] > 0 {
          mutate {
            replace => { "[transaction][type]" => "Copy"}
            add_tag       =>  ['transaction_event']
            add_field    =>  { "[transaction][attempts]" => "%{[@metadata][copy_attempts]}" }
          }
        }
        # Flag Event as success or Failure according to final sum of the
        # transaction status field.
        translate {
          exact       =>  true
          regex       =>  true
          field       =>  "[transaction][status]"
          dictionary  =>  [
                            "^(0|-[1-9][0-9]*$)", "Failure",
                            "^[1-9][0-9]*$", "Success"
                          ]
          destination  =>  "[transaction][status]"
          override     =>  true
        }
      } else {
        # We drop any event that is not a transaction event to avoid this stuff
        # cluttering storage and also avoiding the DNS and GeoIP filters ahead.
        # Change this to something suitable if these events are to be processed
        # and/or stored.
        drop { }
      }
    }
  }
}
Then we enrich the data by resolving DNS, adding GeoIP, set the event's timestamp as the main, parse and organize the data into four groups: Client, (client), Source (src), Destination (dst), Dynafed (dynafed) with filter file 28-dynafed-format.conf:
filter{
  if "dynafed_httpd" in [tags] {
    if "aggregate-last" in [@metadata][step] {
      # Resolve IP addresses to their DNS hostname.
      if [clienthostname] =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/ {
        dns {
          hit_cache_size  =>  1000000
          hit_cache_ttl   =>  3600
          nameserver      =>  [ "142.90.100.19", "142.90.113.69", "142.90.100.68", "208.67.222.222" ]
          hostsfile       =>  [ "/etc/logstash/dns/hosts" ]
          reverse         =>  [ "[clienthostname]" ]
          action          =>  "replace"
        }
      }
      if [clienthostname] =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/ {
        mutate {
          add_tag       =>  [ "dns_clienthostname_warning", "warning" ]
        }
      }
      # Parse and extract the storage endpoint given to the client in the
      # redirection link provided by Dynafed.
      if [storage_endpoint] {
        grok {
          patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
          match          =>  { "storage_endpoint" => "%{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?%{IPORHOST:storage_endpoint}(?::%{POSINT})?(?:%{URIPATH:se_lfn}(?:%{URIPARAM})?)?" }
          overwrite      =>  [ "storage_endpoint" ]
          tag_on_failure =>  [ "_storage_endpoint_grokparsefailure", "failure" ]
        }
        grok {
          patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
          match          =>  { "se_lfn" => "%{URIPATH:se_path}/%{UGR_FILENAME:se_filename}" }
          tag_on_failure =>  [ "_event_copy_selfn_grokparsefailure", "failure" ]
        }
      # If the field does not exist, it means Dynafed is acting as the host
      # usually someone browsing or listing contents.
      } else {
        mutate {
          copy  =>  {
            "[host]"  =>  "[storage_endpoint]"
          }
        }
      }
      # Resolve IP addresses to their DNS hostname.
      if [storage_endpoint] =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/ {
        dns {
          hit_cache_size  =>  1000000
          hit_cache_ttl   =>  3600
          timeout    =>  6
          hostsfile  =>  [ "/etc/logstash/dns/hosts" ]
          reverse    =>  [ "[storage_endpoint]" ]
          action     =>  "replace"
        }
      }
      if [storage_endpoint] =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/ {
        mutate {
          add_tag       =>  [ "dns_storage_endpoint_warning", "warning" ]
        }
      }
      # Extract the dynafed path and the filename
      if [df_lfn]{
        grok {
          patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
          match          =>  { "df_lfn" => "%{URIPATH:df_path}/%{UGR_FILENAME:df_filename}" }
          tag_on_failure =>  [ "_se_lfn_grokparsefailure", "failure" ]
        }
      }
      # Copy needs it's own logic to process the dynafed messages and extract
      # the fields. And since it can be a PUSH or PULL Copy, each needs their
      # own block.
      if "Copy" in [transaction][type]  {
        grok {
          patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
          match          =>  { "df_message" => "%{UGR_HTTPD_COPY}" }
          tag_on_failure =>  [ "_event_copy_grokparsefailure", "failure" ]
        }
        # For third party PUSH
        if "destination is" in [df_message] {
          grok {
            patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
            match          =>  { "dst_lfn" => "%{URIPATH:dst_path}/%{UGR_FILENAME:dst_filename}" }
            tag_on_failure =>  [ "_event_copy_dst_grokparsefailure", "failure" ]
          }
          mutate {
            rename => {
              "dst_filename"     =>  "[dst][filename]"
              "dst_hostname"     =>  "[dst][hostname]"
              "dst_lfn"          =>  "[dst][lfn]"
              "dst_param"        =>  "[dst][param]"
              "dst_path"         =>  "[dst][path]"
              "se_lfn"           =>  "[src][lfn]"
              "se_path"          =>  "[src][path]"
              "se_filename"      =>  "[src][filename]"
              "separam"          =>  "[src][param]"
              "storage_endpoint"  =>  "[src][hostname]"
            }
          }
        # For third party PULL
        } else if "source is" in [df_message] {
          grok {
            patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
            match          =>  { "src_lfn" => "%{URIPATH:src_path}/%{UGR_FILENAME:src_filename}" }
            tag_on_failure =>  [ "_event_copy_src_grokparsefailure", "failure" ]
          }
          mutate {
            rename => {
              "src_filename"     =>  "[src][filename]"
              "src_hostname"     =>  "[src][hostname]"
              "src_lfn"          =>  "[src][lfn]"
              "src_param"        =>  "[src][param]"
              "src_path"         =>  "[src][path]"
              "se_lfn"           =>  "[dst][lfn]"
              "se_path"          =>  "[dst][path]"
              "se_filename"      =>  "[dst][filename]"
              "separam"          =>  "[dst][param]"
              "storage_endpoint"  =>  "[dst][hostname]"
            }
          }
        }
      # In a read transaction data flows the opposite way than Write or Delete
      # so this if statement takes that in to account.
      } else if "Read" in [transaction][type]  {
        mutate {
          rename  =>  {
            "clienthostname"   =>  "[dst][hostname]"
            "clientport"       =>  "[dst][port]"
            "se_lfn"           =>  "[src][lfn]"
            "se_path"          =>  "[src][path]"
            "se_filename"      =>  "[src][filename]"
            "separam"          =>  "[src][param]"
            "storage_endpoint"  =>  "[src][hostname]"
          }
        }
      # For Delete, Write events.
      } else {
        mutate {
          rename  =>  {
            "clienthostname"   =>  "[src][hostname]"
            "clientport"       =>  "[src][port]"
            "se_lfn"           =>  "[dst][lfn]"
            "se_path"          =>  "[dst][path]"
            "se_filename"      =>  "[dst][filename]"
            "separam"          =>  "[dst][param]"
            "storage_endpoint"  =>  "[dst][hostname]"
          }
        }
      }
      # Just formatting fields to make them more meaningful and removing
      # unneeded ones.
      mutate {
        rename  =>  {
          "clientdn"        =>  "[client][dn]"
          "clientfqan"      =>  "[client][fqan]"
          "clienthostname"  =>  "[client][hostname]"
          "clientport"      =>  "[client][port]"
          "df_filename"     =>  "[dynafed][filename]"
          "df_lfn"          =>  "[dynafed][lfn]"
          "df_path"         =>  "[dynafed][path]"
          "df_message"      =>  "[dynafed][message]"
          "filesize"        =>  "[src][filesize]"
          "host"            =>  "[dynafed][hostname]"
        }
        remove_field  =>  [
                            "[temptime]"
                          ]
        convert  =>  {
          "[src][filesize]"  =>  "integer"
        }
        add_tag  =>  [ "dynafed" ]
      }
      geoip {
        cache_size      =>  1000000
        fields          =>  [
                              "city_name",
                              "country_code2",
                              "country_name",
                              "ip",
                              "location",
                              "region_name"
                            ]
        source          =>  "[src][hostname]"
        target          =>  "[src][geoip]"
        tag_on_failure  =>  [ "_geoip_lookup_failed", "warning" ]
      }
      geoip {
        cache_size      =>  1000000
        fields          =>  [
                              "city_name",
                              "country_code2",
                              "country_name",
                              "ip",
                              "location",
                              "region_name"
                            ]
        source          =>  "[dst][hostname]"
        target          =>  "[dst][geoip]"
        tag_on_failure  =>  [ "_geoip_lookup_failed", "warning" ]
      }
      geoip {
        cache_size      =>  1000000
        fields          =>  [
                              "city_name",
                              "country_code2",
                              "country_name",
                              "ip",
                              "location",
                              "region_name"
                            ]
        source          =>  "[dynafed][hostname]"
        target          =>  "[dynafed][geoip]"
        tag_on_failure  =>  [ "_geoip_lookup_failed", "warning" ]
      }
      # Finalised event with all information is tagged as ops which is utilised
      # in the output filter.
      if "aggregate-last" in [@metadata][step] {
        mutate {
          add_tag       =>  [ "dynafed_log_httpd" ]
        }
      }
    }
  }
}
Connection and Storage Space Stats
Dynafed periodically tests each of the endpoints to check whether they are online, that the configured credentials are correct and the latency to each of them. This information is then stored in Memcache. 
Unfortunately it does not have the functionality to do the same for storage space information. Therefore we have developed a Python package to take care of this called "dynafed-storagestats" available through PiPy.
For information about how to set it up and how to use it, please follow the link and check out the README. It is in development and some things are subject to change. For our purposes of this post, this is a brief explanation of how it works.
The package contains specific functions to query the storage space stats for each of the different supported protocols: DAV, S3, and Azure. And for each of these, there might be more than one different method, as detailed in the README. For instance, for S3 we have three we can choose depending on the implementation and it's configuration:
- list-objects: As the name implies, it uses the list-objects API call and gathers a list of all files in the specified bucket and sums their file sizes. This should work on all S3 implementations, but it is slow.
- ceph-admin: When using CEPH, if the bucket user is given "admin caps" for the buckets (ideally read-only") then it can query the ceph admin API to obtain all the stats and is almost instant.
- cloud-watch: When using AWS, if one configure the cloudwatch metrics BucketSizeBytes and NumberOfObjects, then they can be queried to obtain the stats. It works almost instantly but has the downsize of this stats being updated only every 24 hours.
The other protocols have similar choices, most notably for DAV using the RFC4331 for storage that support this. Also to note is some of these API's provide the information about the quotas configured on the different endpoints and some do not, so for the latter, the quota need to be specified manually on the endpoints configuration file to obtain the correct available space (again, see the README for more info).
In any case, once the package is installed and the settings configured, we setup a cronjob that probes the endpoints periodically, obtains the available stats and uploads them to memcache. From here Dynafed can use them to make decisions about whether an endpoint is full or not when selecting which one to provide to a client.
With both the connection and storage space stats in memcache, we use a community created "Beat" called Execbeat. Unfortunately it is no longer maintained and thus will probably not work in the future, but for now it works well, at least with ELK stack up to 6.x. It works by periodically executing any script we specify in its configuration, writes its output in a field called "exec.stdout"  in a JSON document and sends it to Logstash.
#!/bin/python3
"""
The intention of this script is to obtain for each UGR/Dynafed endpoint the
connection stats (Ugrpluginstats) and the storage stats (Ugrstoragestats)
that UGR's plugins and the dynafed_storagestats.py module upload to memcache.
Since both these stats are on separate indices, this script puts them all
together in one string which is printed out. This with the goal to be exported
via execbeat to a Logstash instance where it will be further processed and
before being stored in an elasticsearch database.
"""
from __future__ import print_function
import time
from optparse import OptionParser#, OptionGroup
import memcache
endpoints = []
endpoint_stats = []
################
## Help/Usage ##
################
usage = "usage: %prog [options]"
parser = OptionParser(usage)
parser.add_option('--memhost',
                  dest='memcached_ip', action='store', default='127.0.0.1',
                  help='IP or hostname of memcached instance. Default: 127.0.0.1'
                 )
parser.add_option('--memport',
                  dest='memcached_port', action='store', default='11211',
                  help='Port where memcached instances listens on. Default: 11211'
                 )
options, args = parser.parse_args()
##########
## Main ##
##########
# Setup connection to a memcache instance
memcached_srv = options.memcached_ip + ':' + options.memcached_port
mc = memcache.Client([memcached_srv])
# Obtain the latest index number used by UGR and transform to str if needed.
# Different versions of memcache module return bytes.
idx = mc.get('Ugrpluginstats_idx')
if isinstance(idx, bytes):
    idx = str(idx, 'utf-8')
# Obtain the latest status uploaded by UGR and transform to str if needed.
# Different versions of memcache module return bytes.
connection_stats = mc.get('Ugrpluginstats_' + idx)
if isinstance(connection_stats, bytes):
    connection_stats = str(connection_stats, 'utf-8')
# Remove the \x00 character.
connection_stats = connection_stats.rsplit('\x00', 1)[0]
# Split the stats per '&&' i.e. per storage endpoint.
connection_stats = connection_stats.rsplit('&&')
# For each endpoint then, we are going to obtain the individual stats from UGR
# and use the endpoint's ID to also obtain from memcache the storage stats (if any)
# and concatenate them (separated by '%%') together. Once this has been done for
# each endpoint, we concatenate all endpoints together onto one string
# (separated by '&&').
for element in connection_stats:
    endpoint, stats = element.split("%%", 1)
    #When the connection status is OK the last element is empty. So we add an 'OK'
    if stats.split("%%")[-1] == '':
        stats = stats + 'OK%%'
    else:
        stats = stats + '%%'
    # Creat the index using the endpoint's ID
    storage_stats_index = "Ugrstoragestats_" + endpoint
    storage_stats = mc.get(storage_stats_index)
    if storage_stats is None:
        # If not storage stats exist, just add the fields representing error.
        timestamp = int(time.time())
        storage_stats = '%%'.join([
            'Unknown',
            str(timestamp),
            '-1',
            '-1',
            '-1',
            '[ERROR][NoStorageStats][404] Unable to retreive storage stats from memcached.'
        ])
    else:
        # If sotarge stats exist, make sure they are of string type.
        # Different versions of memcache module return bytes.
        if isinstance(storage_stats, bytes):
            storage_stats = str(storage_stats, 'utf-8')
        storage_stats = storage_stats.split("%%", 1)[-1]
    # Concatenate all stats
    stats = stats + storage_stats
    endpoint_stats.append(stats)
# Concatenate all endpoints
endpoint_stats = '&&'.join(endpoint_stats)
print(endpoint_stats, end="")
The script is placed in "/etc/execbeat/commands/getugrpluginstats.py". We configure Execbeat with using these general options. Note the tags again, which will be used by Logstash later in processing:
## General Options ###
name: "{hostname}"
tags: ["dynafed", "dynafed_ugr_sestatus", "execbeat"]
execbeat:
  commands:
    - command: "/etc/execbeat/commands/getugrpluginstats.py"
      schedule: ""
      document_type: "doc"
output:
  logstash:
      enabled: true
      hosts:
        - "logstash-server:5044"
Now in Logstash we add the following GROK to our ugr.grok file to deal with the string in the "exec.stdout" field:
# Parse UGR Storage Stats
UGR_STORAGESTATS \[%{WORD:ugr.storagestats.status}]\[%{WORD:ugr.storagestats.statuskey}]\[%{NUMBER:ugr.storagestats.statuscode}]
We use the following filter to Extract the information:
filter {
  if "dynafed_ugr_sestatus" in [tags] {
    if "execbeat" in [tags] {
      grok {
        match          =>  { "[exec][stdout]"  =>  "%{GREEDYDATA:message}" }
        tag_on_failure =>  [ "_0-ugrsestatus_grokparsefailure", "failure" ]
      }
      # Remove \u0000 character from message field.
      mutate {
        gsub  =>  [ "message", "\u0000$", "" ]
      }
    }
  }
}
After this, Logstash break the string into the individual metrics and adds them into their own fields, adds the dynafed.hostname fields, adds GeoIP and translates some status codes into keywords the filter file 30-dynafed-ugr-sestatus.conf:
filter {
  if "dynafed_ugr_sestatus" in [tags] {
    if "getugrpluginstats.py" in [exec][command] {
      # Use split to separate each endpoint information into their own event.
      # Delimited by '&&'
      split {
        field       =>  "message"
        terminator  =>  "&&"
      }
      # Use dissect to match and extract data of each endpoint event.
      dissect {
        mapping  =>  {
          "message"  =>  "%{ugr.endpoint}%%%{ugr.connectionstats.time}%%%{ugr.connectionstats.status}%%%{ugr.connectionstats.latency}%%%{ugr.connectionstats.statuscode}%%%{ugr.connectionstats.error}%%%{ugr.storagestats.protocol}%%%{ugr.storagestats.time}%%%{ugr.storagestats.quota}%%%{ugr.storagestats.bytesused}%%%{ugr.storagestats.bytesfree}%%%{ugr.storagestats.message}"
        }
        tag_on_failure  =>  [ "_dissectfailure", "failure" ]
        remove_field    =>  [ "exec", "fields", "message" ]
      }
      # Use split to separate each status message into their own event.
      # Delimited by ','
      split {
        field       =>  "ugr.storagestats.message"
        terminator  =>  ","
      }
      grok{
        patterns_dir   =>  [ "/etc/logstash/conf.d/patterns" ]
        match          =>  { "ugr.storagestats.message"  =>  "%{UGR_STORAGESTATS}" }
        overwrite      =>  [ "ugr.storagestats.message" ]
        tag_on_failure =>  [ "_0-ugrsestatus_grokparsefailure", "failure" ]
      }
      mutate {
        copy  =>  {
          "[host][name]"  =>  "[dynafed][hostname]"
        }
        convert  =>  {
          "[ugr][connectionstats][latency]"  =>  "integer"
          "[ugr][storagestats][bytesfree]"  =>  "integer"
          "[ugr][storagestats][bytesused]"  =>  "integer"
          "[ugr][storagestats][quota]"  =>  "integer"
        }
      }
      translate {
        exact       =>  true
        field       => "ugr.connectionstats.status"
        dictionary  => {
                          "1" => "Online"
                          "2" => "Offline"
                        }
        destination  =>  "ugr.connectionstats.status"
        override     =>  true
      }
      geoip {
        cache_size      =>  1000000
        fields          =>  [
                              "city_name",
                              "country_code2",
                              "country_name",
                              "ip",
                              "location",
                              "region_name"
                            ]
        source          =>  "[dynafed][hostname]"
        target          =>  "[dynafed][geoip]"
        tag_on_failure  =>  [ "_geoip_lookup_failed", "warning" ]
      }
    } else if "get-ugrpluginstats.py" in [exec][command] {
      # Use split to separate each endpoint information into their own event.
      # Delimited by '&&'
      split {
        field       =>  "message"
        terminator  =>  "&&"
      }
      # Use dissect to match and extract data of each endpoint event.
      # ugr.ep is dropped as it is duplicate information of ugr.endpoint
      dissect {
        mapping  =>  {
          "message"  =>  "%{ugr.ep}%%%{ugr.endpoint}%%%{ugr.connectionstats.time}%%%{ugr.connectionstats.status}%%%{ugr.connectionstats.latency}%%%{ugr.connectionstats.statuscode}%%%{ugr.connectionstats.error}"
        }
        tag_on_failure  =>  [ "_dissectfailure", "failure" ]
        remove_field    =>  [ "ugr.ep", "exec","fields", "message" ]
      }
      mutate {
        copy  =>  {
          "[host][name]"  =>  "[dynafed][hostname]"
        }
      }
      translate {
        exact       =>  true
        field       => "ugr.connectionstats.status"
        dictionary  => {
                          "1" => "Online"
                          "2" => "Offline"
                        }
        destination  =>  "ugr.connectionstats.status"
        override     =>  true
      }
      geoip {
        cache_size      =>  1000000
        fields          =>  [
                              "city_name",
                              "country_code2",
                              "country_name",
                              "ip",
                              "location",
                              "region_name"
                            ]
        source          =>  "[dynafed][hostname]"
        target          =>  "[dynafed][geoip]"
        tag_on_failure  =>  [ "_geoip_lookup_failed", "warning" ]
      }
    }
  }
}
Apache Stats
As mentioned above, we use Metricbeat to obtain the Apache stats from the /server-status page. In order to do this, we set up the "mod_status" module in a vhost. An important thing to note here is that when configuring Dynafed's base URL where storage endpoints are mounted, it cannot be done at the root, otherwise the DAV module blocks the /server-status page from working. So Dynafed mount them in something like "/dynafed/*". Here is the vhost configuration we use:Listen "localhost:8088"
<VirtualHost localhost:8088>
  <IfModule mod_status.c>
    # This directive works with the CustomLog format to avoid queries to the status page from being logged
    SetEnvIf Request_URI "/server-status" dontlog
    CustomLog "logs/access_log" combined env=!dontlog
    <Location "/server-status">
      SetHandler server-status
      Require host localhost
    </Location>
  </IfModule>
</VirtualHost>
This allows us to access these stats from http://localhost:8088/server-status and therefore configure Metricbeat to gather these metrics with the following general options fort the Apache module. Again note the tags added which will be used once more by Logstash to process this accordingly:
### General Options ###
name: "{hostname}"
tags: "metricbeat"
### Modules Options ###
metricbeat.modules:
- module: apache
  metricsets: ["status"]
  enabled: true
  period: 10s
  hosts: ["http://localhost:8088/server-status"]
  tags: ["httpd", "dynafed", "dynafed_metrics_httpd"]
### Outputs ###
output:
  logstash:
    enabled: true
    hosts:
      - "logstsh-host:5044"
In Logstash we add the dynafed.hostname field, GeoIP information and calculate the ratio of used/available workers into it's own float field with the filter file 40-dynafed-metrics-httpd.conf:
filter {
  if "dynafed_metrics_httpd" in [tags] {
    # Obtaining the workers ratio
    if [apache][status][workers][busy] {
      ruby {
        code => "
                  busy  = event.get('[apache][status][workers][busy]').to_f
                  idle  = event.get('[apache][status][workers][idle]').to_f
                  ratio = busy / (busy + idle)
                  event.set('[apache][status][workers][ratio]', ratio)
                "
      }
      mutate {
        # Typecast to avoid issues with Elasticsearch.
        convert  =>  {
          "[apache][status][workers][ratio]"  =>  "float_eu"
        }
      }
    }
    mutate {
      # Add dynafed fields for consistency with the other dynafed logs.
      copy  =>  {
        "[host][name]"  =>  "[dynafed][hostname]"
      }
    }
    geoip {
      cache_size      =>  1000000
      fields          =>  [
                            "city_name",
                            "country_code2",
                            "country_name",
                            "ip",
                            "location",
                            "region_name"
                          ]
      source          =>  "[dynafed][hostname]"
      target          =>  "[dynafed][geoip]"
      tag_on_failure  =>  [ "_geoip_lookup_failed", "warning" ]
    }
  }
}
Elasticsearch
Finally these events are sent to an elasticsearch DB. To keep things organised, the indicex names will include the hostname, filebeat's version (to keep things from breaking with updates from elastic), and timestamped year + month to create a new index every month and keep their sizes from getting out of hand. Later it would be useful to setup elasticsearch-curator or similar to monitor for number of events to keep them under the limit of 2 billion documents per index, retention times, etc.output {
  if "dynafed" in [tags] {
    if "failure" in [tags] {
      elasticsearch {
        hosts  =>  ["http://elastic-server:9200"]
        index  =>  "dynafed-failures-%{[beat][hostname]}-%{[beat][version]}-%{+yyyy.MM}"
        document_type => "_doc"
      }
    } else if "dynafed_acct_log" in [tags] {
      elasticsearch {
        hosts  =>  ["http://elastic-server:9200"]
        index  =>  "dynafed-httpd-acct-%{[beat][hostname]}-%{[beat][version]}-%{+yyyy.MM}"
        document_type => "_doc"
      }
    } else if "dynafed_log_httpd" in [tags] {
      elasticsearch {
        hosts  =>  ["http://elastic-server:9200"]
        index  =>  "dynafed-logs-httpd-%{[beat][hostname]}-%{[beat][version]}-%{+yyyy.MM}"
        document_type => "_doc"
      }
    } else if "dynafed_ugr_sestatus" in [tags] {
      elasticsearch {
        hosts  =>  ["http://elastic-server:9200"]
        index  =>  "dynafed-metrics-ugr-%{[beat][hostname]}-%{[beat][version]}-%{+yyyy.MM}"
        document_type => "_doc"
      }
    } else if "dynafed_metrics_httpd" in [tags] {
      elasticsearch {
        hosts  =>  ["http://elastic-server:9200"]
        index  =>  "dynafed-metrics-httpd-%{[beat][hostname]}-%{[beat][version]}-%{+yyyy.MM}"
        document_type => "_doc"
      }
    } else {
      elasticsearch {
        hosts  =>  ["http://elastic-server:9200"]
        index  =>  "dynafed-debug-%{[beat][hostname]}-%{[beat][version]}-%{+yyyy.MM}"
        document_type => "_doc"
      }
    }
  }
}
A few things to note here, one is the "_doc" line, this is added to for future compatibility with ES 7.x in case we upgrade later, and the different indices Logstash will create:
- dynafed-logs-httpd-*: Main index with the processed data and what we use for analysis and visualizatoin.
- dynafed-failures-*: When any of the filters add a "failure" tag when there is an issue processing, these events will go here for further analysis.
- dynafed-debug-*: Here we capture all the events as they arrive from Filebeat without any Logstash intervention. This is useful to see every event as it arrive, and even to reprocess the data in case we find failures. It grows fast! So be careful!
- dynafed-metrics-ugr: The connection and storage stats information
- dynafed-metrics-httpd-*: The Apache metrics information.
We use custom index templates in order to correctly store all the information in their proper "data types", customised dynamic_template settings and other general index settings like aliases, replicas etc. Needless to say, these need to be setup prior to data being shipped into elasticsearch.
{
  "index_patterns": "dynafed-*",
  "order": 10,
  "mappings": {
    "dynamic_templates": [
      {
        "strings": {
          "match_mapping_type": "string",
          "mapping": {
            "type": "keyword",
            "ignore_above": 1024
          }
        }
      }
    ],
    "properties": {
      "dynafed": {
        "type": "object",
        "properties": {
          "geoip": {
            "type": "object",
            "properties": {
              "ip": {
                "type": "ip"
              },
              "latitude": {
                "type": "half_float"
              },
              "location": {
                "type": "geo_point"
              },
              "longitude": {
                "type": "half_float"
              }
            }
          }
        }
      }
    }
  }
}
Then we define the field mappings for each of the indices. For dynafed-logs-httpd we use 20-dynafed-logs-httpd-mappings-template.json:
{
  "index_patterns": "dynafed-logs-httpd-*",
  "order": 20,
  "mappings": {
    "dynamic_templates": [
      {
        "strings": {
          "match_mapping_type": "string",
          "mapping": {
            "type": "keyword",
            "ignore_above": 1024
          }
        }
      }
    ],
    "properties": {
      "event_id": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "dst": {
        "type": "object",
        "properties": {
          "geoip": {
            "type": "object",
            "properties": {
              "ip": {
                "type": "ip"
              },
              "latitude": {
                "type": "half_float"
              },
              "location": {
                "type": "geo_point"
              },
              "longitude": {
                "type": "half_float"
              }
            }
          }
        }
      },
      "dynafed": {
        "type": "object",
        "properties": {
          "message": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          }
        }
      },
      "events": {
        "type": "nested",
        "properties": {
          "clientip": {
            "type": "ip"
          },
          "df_message": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "timestamp": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss.SSSSSSZZ"
          }
        }
      },
      "src": {
        "type": "object",
        "properties": {
          "filesize": {
            "type": "long"
          },
          "geoip": {
            "type": "object",
            "properties": {
              "ip": {
                "type": "ip"
              },
              "latitude": {
                "type": "half_float"
              },
              "location": {
                "type": "geo_point"
              },
              "longitude": {
                "type": "half_float"
              }
            }
          }
        }
      },
      "transaction": {
        "properties": {
          "attempts": {
            "type": "short"
          }
        }
      },
      "timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss.SSSSSSZZ||strict_date_optional_time"
      }
    }
  }
}
And 30-dynafed-logs-httpd-aliases-template.json:
{
  "index_patterns": "dynafed-logs-httpd-*",
  "order": 30,
  "aliases": {
    "dynafed-logs-httpd-search": {}
  }
}
For dynafed-metrics-httpd-* we use 20-dynafed-metrics-httpd-mappings-template.json:
{
  "index_patterns": "dynafed-metrics-httpd-*",
  "order": 20,
  "mappings": {
    "properties": {
      "apache": {
        "properties": {
          "status": {
            "properties": {
              "bytes_per_request": {
                "type": "float"
              },
              "bytes_per_sec": {
                "type": "float"
              },
              "connections": {
                "properties": {
                  "async": {
                    "properties": {
                      "closing": {
                        "type": "long"
                      },
                      "keep_alive": {
                        "type": "long"
                      },
                      "writing": {
                        "type": "long"
                      }
                    }
                  },
                  "total": {
                    "type": "long"
                  }
                }
              },
              "cpu": {
                "properties": {
                  "load": {
                    "type": "float"
                  }
                }
              },
              "load": {
                "type": "object"
              },
              "requests_per_sec": {
                "type": "float"
              },
              "scoreboard": {
                "properties": {
                  "closing_connection": {
                    "type": "long"
                  },
                  "dns_lookup": {
                    "type": "long"
                  },
                  "gracefully_finishing": {
                    "type": "long"
                  },
                  "idle_cleanup": {
                    "type": "long"
                  },
                  "keepalive": {
                    "type": "long"
                  },
                  "logging": {
                    "type": "long"
                  },
                  "open_slot": {
                    "type": "long"
                  },
                  "reading_request": {
                    "type": "long"
                  },
                  "sending_reply": {
                    "type": "long"
                  },
                  "starting_up": {
                    "type": "long"
                  },
                  "total": {
                    "type": "long"
                  },
                  "waiting_for_connection": {
                    "type": "long"
                  }
                }
              },
              "total_accesses": {
                "type": "long"
              },
              "total_kbytes": {
                "type": "long"
              },
              "uptime": {
                "properties": {
                  "uptime": {
                    "type": "long"
                  }
                }
              },
              "workers": {
                "properties": {
                  "busy": {
                    "type": "long"
                  },
                  "idle": {
                    "type": "long"
                  },
                  "ratio": {
                    "type": "float"
                  }
                }
              }
            }
          }
        }
      },
      "metricset": {
        "properties": {
          "rtt": {
            "type": "long"
          }
        }
      }
    }
  }
}
And 30-dynafed-metrics-httpd-aliases-template.json:
{
  "index_patterns": "dynafed-metrics-httpd-*",
  "order": 30,
  "aliases": {
    "dynafed-metrics-httpd-search": {}
  }
}
For dynafed-metrics-ugr-* we use 20-dynafed-metrics-ugr-mappings-template.json:
{
  "index_patterns": "dynafed-metrics-ugr-*",
  "order": 20,
  "mappings": {
    "properties": {
      "ugr": {
        "type": "object",
        "properties": {
          "connectionstats": {
            "type": "object",
            "properties": {
              "error": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword"
                  }
                }
              },
              "latency": {
                "type": "long"
              },
              "time": {
                "type": "date",
                "format": "epoch_second"
              }
            }
          },
          "storagestats": {
            "type": "object",
            "properties": {
              "bytesfree": {
                "type": "long"
              },
              "bytesused": {
                "type": "long"
              },
              "message": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword"
                  }
                }
              },
              "quota": {
                "type": "long"
              },
              "time": {
                "type": "date",
                "format": "epoch_second"
              }
            }
          }
        }
      }
    }
  }
}
And 30-dynafed-metrics-ugr-aliases-template.json:
{
  "index_patterns": "dynafed-metrics-ugr-*",
  "order": 30,
  "aliases": {
    "dynafed-metrics-ugr-search": {}
  }
}
Of course the alias templates could be added instead in the other templates, but we like to keep all settings separate. We use others as well for general index settings like replicas and shards as you can see here (only the ones with "dynafed" in the name are related to this project).
Grafana
With all this setup, we can now install Grafana on a server, point to our Elasticsearch database so that we can produce graphs like this dashboards like this:
Something worthy of mention but that is out of the scope here is how we secure the ELK stack communication and the database. For this we have chosen Amazon's Open Distro for Elasticsearch plugin with the OSS version of the elastic software. This allows us to setup role based authentication and access to the different indices and other projects we are running in the stack. We chose it because of money considerations, since it's free, and has all the features that are useful to us. One of them, which is not included in the new free X-Pack security features is "field-based" permissions. This is a nice feature that allows us to anonymize certain fields, which include user authentication credentials, for those users who don't need to see this information.
Another this to mention is that many of the filters and configurations here are cleaned up and sanitised versions of what we use. That's why there might be discrepancies between what is posted here and what is found in the linked git repository but they should work. (Also it's in development so things are likely to change as time goes by).
And speaking of the git repository, it is actually a larger Ansible tree of roles, playbooks and settings that we use to setup and manage the ELK stack, and some other things. The root of the repository is here and anyone is welcome to try, fork, adapt and use it according to the MIT license. The main README contains more detailed configurations for each of the tools described here that might be of interest. Also each role has it's own README which hopefully are useful enough for anyone to figure out how to use all of this.
We hope this is helpful and informative to others that might be thinking on using the ELK stack for their own project or even monitoring their own Dynafed!
Regards,
Fernando




Comments
Post a Comment