ELK Stack and Blue Coat Logs, Part 3

Part 1 here.

For this part I’ve used configuration from https://github.com/cvandeplas/ELK-forensics as the starting point. It didn’t work for me out of the box, and I also added a couple enhancements, but you might still want to check it out. You might also want to have a look at this blog post by the same author: http://christophe.vandeplas.com/2014/07/bluecoat-proxy-log-search-and-analytics.html

When parsing log data, the first step is to extract fields out of the log entry. This is done in the filter section of the Logstash config file. Two most used Logstash filters for field extraction are grok and csv.

Grok is very flexible regular expression based log entry parsing filter. Grok pattern is a series of %{SYNTAX:SEMANTIC} definitions, where SYNTAX defines a regex pattern, and SEMANTIC provides the event property name to store the field value. Logstash comes with a number of pre-defined patterns, such as NUMBER, IP, URIPATHPARAM, EMAILADDRESS etc. (full list here). Logstash also comes with several pre-defined log file patterns, such as COMMONAPACHELOG. Specify match parameter to tell grok what field to match (usually message, which stores the whole log entry by default) and supply the pattern. Here’s an example:

filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}

This will match log entries of the following format:

55.3.244.1 GET /index.html 15824 0.043

By default, grok will add a tag of _grokparsefailure to the log entry if parsing fails (can be used in a logic later on — output unparsed log entries into a separate file, for example). You can use the following web page to test and debug your grok patterns: http://grokdebug.herokuapp.com/

In case of Blue Coat logs it is easier to treat the log files as CSV, with space as a separator. This can be accomplished using Logstash csv filter as follows:

csv {
columns => [“date”, “time”, “time_taken”, “c_ip”, “cs_username”, “cs_auth_group”, “x_exception_id”, “sc_filter_result”, “cs_categories”, “cs_referer”, “sc_status”, “s_action”, “cs_method”, “rs_content_type”, “cs_uri_scheme”, “cs_host”, “cs_uri_port”, “cs_uri_path”, “cs_uri_query”, “cs_uri_extension”, “cs_user_agent”, “s_ip”, “sc_bytes”, “cs_bytes”, “x_virus_id”, “x_bluecoat_application_name”, “x_bluecoat_application_operation”]
separator => “ “
}

Columns defines field names to be extracted from the log entry. Log file format is configuration in Blue Coat appliances, so you will have to check your specific log formation and list the field names appropriately — Blue Coats list them conveniently at the beginning on a log file in a comment. Use underscore instead of a dash (default for Blue Coat) for the field names (i.e. cs_host instead of cs-host). Separator parameter tells Logstash to use space as a separator (“,” by default).

Now that we extracted different fields from the log entry, we need to tell Logstash which field to use as a timestamp. To do this we will use date filter and use match parameter to specify which field contains the timestamp and timestamp format so Logstash can parse it. For example:

date {
match => [“timestamp”, “YYYY-MM-dd HH:mm:ss” ]
timezone => [‘UTC’]
}

In my logs, Blue Coat had two fields — date and time — that have to be combined to form a timestamp. I’ve used mutate filter for this:

mutate {
add_field => { “timestamp” => “%{date} %{time}” }
}

This adds a new field named timestamp that combines date and time fields.

Next we can enrich the log entry with geospacial information based on the destination IP address with geoip filter. If your logs have s-supplier-ip field, you can easily use it for this purpose:

geoip {
source => “s_supplier_ip”
}

If your logs don’t contain s-supplier-ip, you might want to to DNS lookup on cs-host field to get the server IP. Note that this significantly affect Logstash performance — test before using in production. We will create a new field cs_host_ip first, which we will initialize to the value of cs_host field. Then we’ll use Logstash dns filter to replace the value of cs_host_ip with the looked up IP address:

mutate {
add_field => {“cs_host_ip” => “%{cs_host}” }
}
dns {
resolve => [“cs_host_ip”]
action => “replace”
}

Next, we will split cs_categories (contains Blue Coat category names separated by “;”) field into an array containing separate categories for better analysis capabilities within Kibana:

mutate {
split => { “cs_categories” => “;” }
}

Useragent filter provides parsing capabilities for User-Agent strings and extracts information such as browser type (Chrome, IE, etc.), version, OS and version, etc.:

useragent {
source => “cs_user_agent”
prefix => “user_agent_”
}

Also, we need to tell Elasticsearch which fields contain numbers (again, for appropriate analysis in Kibana). We do this with mutate filter (adjust the list of fields according to what fields are in your log):

mutate {
convert => [“sc_bytes”, “integer”,
“time_taken”, “integer”,
“r_port”, “integer”,
“s_port”, “integer”,
“cs_bytes”, “integer”,
“duration”, “integer”
]
}

Lastly, we can remove unneeded fields using mutate (no longer needed once all the fields are parsed and timestamp is converted and parsed):

mutate {
remove_field => [“message”, “date”, “time”]
}

More information on Logstash filter plugins: https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

Before showing my sample config file in full, let’s talk about conditionals. To avoid parsing errors, we will wrap some of the statements in if { } Logstash conditionals. You can use [] to access current event fields. For example (for s_supplicant_ip field):

if ( [s_supplier_ip] and [s_supplier_ip] != “-” ){
geoip {
source => “s_supplier_ip”
}
}

Here’s the full sample config:

input {
stdin { }
}
filter {
# drop comment lines
if ([message] =~ /^#/) {
drop{}
}
csv {
columns => ["date", "time", "time_taken", "c_ip", "cs_username", "cs_auth_group", "x_exception_id", "sc_filter_result", "cs_categories", "cs_referer", "sc_status", "s_action", "cs_method", "rs_content_type", "cs_uri_scheme", "cs_host", "cs_uri_port", "cs_uri_path", "cs_uri_query", "cs_uri_extension", "cs_user_agent", "s_ip", "sc_bytes", "cs_bytes", "x_virus_id", "x_bluecoat_application_name", "x_bluecoat_application_operation"]
separator => " "
}
# parse timestamp
if [date] and [time] {
mutate {
add_field => { "timestamp" => "%{date} %{time}" }
}
date {
match => ["timestamp", "YYYY-MM-dd HH:mm:ss" ]
timezone => ['UTC']
}
}
# enrich log entry with destination geolocation info
if ( [s_supplier_ip] and [s_supplier_ip] != "-" ){
geoip {
source => "s_supplier_ip"
}
} else if ( [cs_host] and [cs_host] != "-" ){
mutate {
add_field => {"cs_host_ip" => "%{cs_host}" }
}
dns {
resolve => ["cs_host_ip"]
action => "replace"
}
geoip {
source => "cs_host_ip"
}
}
# parse User-Agent header
if ([cs_user_agent] and [cs_user_agent] != "" and [cs_user_agent] != "-") {
useragent {
source => "cs_user_agent"
prefix => "user_agent_"
}
}
# split Blue Coat web site categories into an array
if ([cs_categories] and [cs_categories] != "" and [cs_categories] != "-") {
mutate {
split => { "cs_categories" => ";" }
}
}
# type convert number fields
mutate {
convert => ["sc_bytes", "integer",
"time_taken", "integer",
"r_port", "integer",
"s_port", "integer",
"cs_bytes", "integer",
"duration", "integer"
]
}
# cleanup
mutate {
remove_field => ["message", "date", "time"]
}
}
output {
elasticsearch {
hosts => ["localhost"]
}
}

To use this in a manual log ingestion mode, save the above config in a file (I’ll use my_logstash.conf as the config file name below), then launch Logstash as follows:

zcat BCSGP*_main_*.log.gz | /opt/logstash/bin/logstash -f my_logstash.conf -w 2

(Use simple cat if your logs files are not compressed and replace /opt/logstash/bin with path to your Logstash). -f specifies config file to use, -w 2 instructs Logstash to launch 2 worker processes for log parsing and filtering. A quick note about threading:

  • Each Logstash input runs in its own thread.
  • Filters use a worker model, -w switch above controls the number of workers, default is 1. Set according to the number of processor cores you have available.
  • Output is currently a single thread.

See https://www.elastic.co/guide/en/logstash/current/pipeline.html for more info.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store