Logstash Collector

Download this manual as a PDF file

Configuring Logstash to Send Log Data to Skylar Automated RCA

If you have upgraded from version 7.x of Logstash to version 8.x, ECS compatibility will be on by default. Depending on your environment and settings, you might need to turn off ECS compatibility. For more information, see https://www.elastic.co/guide/en/logstash/current/breaking-8.0.html#bc-ecs-compatibility.

In Skylar Automated RCA, you will need to retrieve your Skylar Automated RCA URL and Auth Token for to configuring the Logstash HTTP Output plugin:

  1. If your account has multiple deployments, go to the Skylar Automated RCA user interface, click the Deployment drop-down in the top-right navigation bar, and switch to the deployment you want to use to collect Windows logs.
  2. Go to the Integrations & Collectors page (Settings () Integrations & Collectors).
  3. In the Log Collectors section, click Other.
  4. Make a note of the values in the ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN fields, as you will use them configuring Logstash.

Next, you will need to log into Logstash to complete the fields required by Skylar Automated RCA.

Skylar Automated RCA requires certain fields (keys) to be defined for each log event. These definitions are part of the "filter" section in the logstash configuration.

There are four required (and one optional) Skylar Automated RCA fields that you can use to define the Logstash filter configuration for proper Incident detection in Skylar Automated RCA . An example Logstash configuration is shown below the table:

Type Description Key Name Key Definition Requirement
Time Timestamp/time zone of each log event.
@timestamp Timestamp of each log event (rather than the time the event was processed by Logstash if possible). Required.
@ze_timezone Time zone of each log event. E.g. "America/Los_Angeles" Optional. Note: UTC is the default.
Log Generator Indicates the source of the log event.
@ze_deployment_name Identifies the environment or application domain. In the Skylar Automated RCA UI this is known as the Service Group (see Note on Service Groups below) E.g. "production", "dev", "acme_calendar_app" Recommended.
@ze_host Host name identifier Required.
@ze_logtype The basename of the log source. E.g. "access.log", "syslog". In the Skylar Automated RCA UI, it will be the logtype. In the container world, this would probably be the app name. Required.
Log Events Wrapped in JSON If the application or host log events are simply wrapped in a JSON and contain a field like "message" : "2020-10-23 04:17:37 mars INFO systemd[1]: Stopped PostgreSQL RDBMS.", then these keys need to be defined.
@ze_msg If the JSON contains a field representing a typical "log event" <PREFIX INFORMATION> <EVENT TEXT>, then this Skylar Automated RCA key should be set to the value of that "log event". The Skylar AI will then structure this field into an Event Type (etype) used for Incident detection. Required (if your log events are wrapped in JSON).
@ze_sev If @ze_msg does not contain a severity, then this field can be used to explicitly set the severity based on some other criteria or field from the payload. Optional.
External ID Mapping Map events in Skylar Automated RCA to corresponding events in Elasticsearch.
@ze_xid Assign a unique id (UUID) to every log event so that events in Skylar Automated RCA can be mapped to corresponding events in Elasticsearch through a common UUID. Required (if using Kibana/Elasticsearch to view Skylar Automated RCA Incidents).
Configuration metadata Arbitrary name/value pairs associated with each log event. @ze_cfg_<name> Show as configuration metadata in the Skylar Automated RCA user interface and in the Outgoing Webhook integration payload. For example:

mutate {

add_field => { "@ze_cfg_myname1" => "myvalue1" }

}

Adds a metadata field called myname1 with a value of myvalue1.

Optional

Service Groups

A service group defines a failure domain boundary for anomaly correlation. This allows you to collect logs from multiple applications and isolate the logs of one from another so as not to mix these in a Root Cause report.

If you are uploading multiple logs from different services in the same application, you would specify the same service group for each log event from that application. For example, let's say that you have a database log, an application log, and a middleware log for the Acme Calendar application. You would use an appropriate service group when uploading all files from that application, such as acme_calendar_app.

Configuring Logstash Filters for Skylar Automated RCA Required Fields (in Logstash)

  1. Edit the appropriate Logstash configuration file to define the required Skylar Automated RCA with Elastic Stack filter definitions. All of these definitions are within the filter { } section of the configuration.
  2. TIME FIELDS
  • @timestamp should contain the timestamp from the log event (not the timestamp when processed by Logstash). This is important for proper incident detection in Skylar Automated RCA.

  • Processing multi-line events should be enabled such that child log event lines are concatenated to the parent event with newlines.

  • The following code example shows an example configuration for meeting these requirements:

    #----------------------------------------------------------------------#
    # Input Filter definition for processing multi-line events (if needed) #
    #----------------------------------------------------------------------#
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate  => true
      what    => "previous"
    }
    
    #------------------------------------------------------------------------------------------#
    # Grok and Date Filter for capturing log event timestamp in @timestamp                     #
    # If it is not possible to easily capture the event timestamp as @timestamp as shown here, #
    # it is OK to leave @timestamp as-is (i.e. use the logstash generated timestamp)           # 
    #------------------------------------------------------------------------------------------#
    grok {
      match => [ "message", "(?m)%{TIMESTAMP_ISO8601:logdate}" ] # Note the multi-line capture pattern (?m)
    }
    date {
      # This will set @timestamp
      match        => [ "logdate", "yyyy-MM-dd HH:mm:ss,SSS", "yyyy-MM-dd HH:mm:ss" ]
      timezone     => "America/Los_Angeles"
      remove_field => ["logdate"]
    }
    
    #---------------------------------------#
    # Capture @ze_timezone                  #
    # If not specified, UTC will be assumed #
    #---------------------------------------#
    mutate {
        add_field => { @ze_timezone => "America/Los_Angeles" }  # Specify timezone (IANA TZ Names) 
    if your log timestamps are missing the timezone info, otherwise UTC is assumed (optional).
    }
  1. LOG GENERATOR FIELDS

    #-----------------------------------------------------------------#
    # Mutate Filter for capturing logtype, host and gid               #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
       add_field => { "@ze_deployment_name" => "%{my_deployment}"  } # assumes field "my_deployment"  is part of the payload (recommended)
       add_field => { "@ze_host"            => "%{host}"           } # assumes field "host"           is part of the payload (required)
       add_field => { "@ze_logtype"         => "%{logtype}"        } # assumes field "logtype"        is part of the payload (required)
    }
  2. LOG EVENTS WRAPPED IN JSON FIELDS

    This configuration is required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically for Incident Detection.

    #-----------------------------------------------------------------#
    # Required if your log events are wrapped in JSON                 #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
        add_field => { "@ze_msg"  => "%{message}"         } # Capture the unstructured log event from the message field - Skylar AI  will automatically structure this into an etype (required)
        add_field => { "@ze_sev"  => "%{[log][severity]}" } # Capture the severity explicitly since "message" field does not contain severity (optional)
        add_field => { "@ze_pfx"  => "%{[log][process]}"  } # Capture the process name and add to the log event prefix so its part of the automatic structuring (optional)
    }
  3. EXTERNAL ID MAPPING FIELD

    This is not part of a mutate filter.

    uuid {
      target => "@ze_xid"  # Generate a Unique ID and assign to @ze_xid 
    }
  4. SAVE YOUR CONFIGURATION FILE.

Configuring Log Event Output to Skylar Automated RCA (in Logstash)

  1. Edit the appropriate Logstash configuration file to define the required Skylar Automated RCA with Elastic Stack output definition.

  2. Add the following Output Filter definition for Skylar Automated RCA and substitute ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN with the values from step 5 of Configuring Logstash to Send Log Data to Skylar Automated RCA, above.

    output {
      if <SOME_CONDITION_IS_TRUE> {
        http {
          format      => "json_batch"
          http_method => "post"
          url         => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch"
          headers     => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"]
        }
      }
    }
  3. SAVE YOUR CONFIGURATION FILE.

Reload Logstash Configuration

Reload your Logstash configuration to pick up all changes. Data will now be ingesting into Skylar Automated RCA.

Complete Example for filebeat and winlogbeat Data

It is highly recommended you read this carefully and follow the sample below:

input {
  beats {
    port => 5044
  }
}
 
filter {
 
  #--------------------------------------------#
  # Add the UUID to all events before          #
  # cloning a copy for the zebrium only fields #
  #--------------------------------------------#
  uuid {
    target => "@ze_xid"  # Generate a Unique ID and assign to @ze_xid
  }
 
  #---------------------------------------------#
  # Make a clone of the message so we only send #
  # Skylar add-ons to Skylar and not to other   #
  # existing outputs like elastic               #
  #---------------------------------------------#
  clone {
    clones => ['zebrium']
  }
 
  #------------------------------------#
  # Add Skylar specifics to the clone #
  #------------------------------------#
  if( [type] == 'zebrium' ) {
    #--------------------------------------------------------------#
    # Common attributes across filebeats, winlogbeats #
    #--------------------------------------------------------------#
    mutate {
      add_field => { "[@metadata][zebrium]" => true }
    }
    mutate {
      add_field => { "@ze_deployment_name" => "mydeployment01"  }
    }
    if( [host][hostname] ) {
      mutate {
        add_field => { "@ze_host" => "%{[host][hostname]}" }
      }
    } else if ( [host][name] ) {
      mutate {
        add_field => { "@ze_host" => "%{[host][name]}" }
      }
    }
    if( [@ze_host] ) {
      mutate {
        gsub => [ "@ze_host", "^([^\.]+)", "\1" ] # Use hostname without fully qualified domain
      }
    } else {
      mutate {
        add_field => { "@ze_host" => "unknown" }
      }
    }
 
    #------------------------------#
    # winlogbeat specific captures #
    #------------------------------#
    if( [agent][type] and [agent][type] == "winlogbeat" ) {
      if( [log][level] ) {
        mutate {
          add_field => { "@ze_sev" => "%{[log][level]}" }
        }
      }
      if( [message] ) {
        mutate {
          add_field => { "@ze_msg"  => "%{[message]}"  }
          add_field => { "@ze_time" => "%{@timestamp}" }
        }
      }
      if( [event][provider] ) {
        mutate {
          add_field => { "@ze_logtype" => "%{[event][provider]}" }
        }
      } else if( [event][module] ) {
        mutate {
          add_field => { "@ze_logtype" => "%{[event][module]}" }
        }
      } else {
        mutate {
          add_field => { "@ze_logtype" => "winlogbeat" }
        }
      }
      if [@ze_logtype] and [@ze_logtype] =~ "^Microsoft\-Windows\-" {
        # Sometimes we see provider start with Microsoft-Windows-, so get rid the that extraneous string and pickup the reaminder as the logtype
        mutate {
          gsub => [ "@ze_logtype", "^Microsoft\-Windows\-(.*)$", "\1" ]
        }
      }
    }
    #----------------------------#
    # filebeat specific captures #
    #----------------------------#
    if( [agent][type] and [agent][type] == "filebeat" ) {
      if( [message] ) {
        mutate {
          add_field => { "@ze_msg" => "%{[message]}" }
        }
      }
      if( [log][file][path] ) {
        grok {
          match => [ "[log][file][path]","%{GREEDYDATA}[\\/]%{GREEDYDATA:logtype}\.log" ]
        }
        mutate {
          add_field    => { "@ze_logtype" => "%{logtype}" }
          remove_field => [ "logtype" ]
        }
        mutate {
          # Sometimes the log filename starts with the hostname, remove that so all logs of the same type are grouped together
          gsub => [ "@ze_logtype", "^%{@ze_host}([^\d]+).*$", "\1" ]
        }
      } else {
        mutate {
          add_field => { "@ze_logtype" => "filebeatlog" }
        }
      }
    }
  } # END OF ZEBRIUM
}
 
output {
  # SEND ZEBRIUM DATA TO ZEBRIUM ONLY
  if [@metadata][zebrium] {
    http {
        format      => "json_batch"
        http_method => "post"
        url         => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch"
        headers     => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"]
        proxy       => "<proxy>"
    }
  # THEN SEND DATA AS WAS DONE BEFORE ADDING ZEBRIUM
  } else if [@metadata][pipeline] {
    elasticsearch {
        hosts => ["https://localhost:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}"
        pipeline => "%{[@metadata][pipeline]}"
        ssl => true
        ssl_certificate_verification => true
        cacert => '/etc/logstash/certs/ca.crt'
        user => elastic
        password => "${ES_PW}"
    }
  } else {
    elasticsearch {
        hosts => ["https://localhost:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}"
        pipeline => beats
        ssl => true
        ssl_certificate_verification => true
        cacert => '/etc/logstash/certs/ca.crt'
        user => elastic
        password => "${ES_PW}"
    }
  }
}