Analyse WSO2 Identity Server event logs with ELK (Part 2)

Avarjana Panditha
5 min readJul 31, 2022

--

WSO2 Identity Server (IS) provides various ways to access their events through their event publishers. Apart from the existing WSO2 IS Analytics Solution, users can create their own analytic solutions using those event publishers offered by WSO2 IS. I am going to guide you through the process of configuring WSO2 IS to publish their events to a locally installed Elastic Stack.

I have discussed about the HTTP publisher based approach in this article (ELK v7.X). However, there are certain drawbacks of using a HTTP based approach like losing data in the downtime. The approach discussed in this article is to ship wso2carbon log files with Filebeat to ELK and visualise them.

You need to have up and running ELK v8.X stack to follow this tutorial series.

I will continue this as a series of articles with three parts containing the following.

Additional articles for your knowledge,

Quick recap

We have a WSO2 Identity Server which writes to a log file and the Filebeat module collecting those logs, do some processing and sending it to Logstash so far. Let’s continue the journey from there.

Capture, process and ship the logs with Logstash

The Logstash configuration can be found here. I will go through the three major sections of this configuration namely input{}, filter{} and output{}.

The input{} section

As the name suggests, this section deals with the input sent from the Filebeat module. Filebeat ships to the port 5044 by default. The beats{} section inside the input indicates that we are taking the input from a beats module which is Filebeat in this case and from the port 5044.

The filter{} section

This is where the action happens in Logstash. Let’s go through each sub section in this filter. The log received from the Filebeat is now referred to as the message.

Dropping unwanted lines. (drop{})

Drop all the log lines that does not contain Event: in the message. This is to ensure that only the required lines are processed through Logstash filter section. Logstash has built in drop{} filter plugin to help with this task.

Mutate filter (mutate{})

WSO2 Identity Server indicates the Resident IdP as LOCAL. For the ease of understanding in the visualisations, we convert it into the word Resident. This is done through the plugin mutate{} and more specifically the gsub{}.

Log fields parsing with grok{}

The log line contains the format as follows. We need to capture the timestamp, event type and the event data.

[xxxx-xx-xx xx:xx:xx,xxx] [xxxxxx-xxx-xxx-xxx-xxxxx]  INFO {org.wso2.carbon.event.output.adapter.logger.LoggerEventAdapter} - Unique ID: auth
Event: {<JSON_EVENT_DATA>}

This can be easily done with the famous grok{} filter plugin. The following grok pattern is used to capture those three items,

\[%{TIMESTAMP_ISO8601:logtime}\].*Unique ID: %{GREEDYDATA:eventType},\n Event: %{GREEDYDATA:eventdata}

This capture the Timestamp as logtime, event type as eventType and JSON event as eventdata. We can refer to these variables from now on.

Converting the event data to json (json{})

As you know that our eventdata field is a JSON but Logstash needs to parse it from the string. Json{} filter plugin comes in handy for this task. It will break down the fields inside the JSON event data object as variables. Now we can delete the message and eventdata fields because those fields are converted and redundant now with the new variables.

Converting the roles comma separated into an array (ruby{})

WSO2 Identity Server ships user’s roles as comma seperated values. For Kibana to do a good visualisation, we need to have this in an array. That is done through the ruby{} filter plugin where we can execute a ruby code block inside the filter section. The code is self explanatory with simple Ruby syntax.

Set the timestamp (date{})

Elasticsearch and Kibana uses the Timestamp field to filter documents. Since we already decoded the log timestamp, we just need to point that variable as the Timestamp field. Can be easily done with date{} filter plugin. The date pattern of out logtime field is yyyy-MM-dd HH:mm:ss,SSS. This is needed to convert the field as the Timestamp.

User agent decode for Session events (useragent{})

We are planning to analyse the browser and the operating system of the user for each login session. That can be decoded from the [event][payloadData][userAgent] field in the Session event log. The useragent{} filter plugin helps to achieve this goal. The output will be like,

{
"name"=>"Firefox",
"version"=>"45.0",
"major"=>"45",
"minor"=>"0",
"os_name"=>"Mac OS X",
"os_version"=>"10.11",
"os_full"=>"Mac OS X 10.11",
"os_major"=>"10",
"os_minor"=>"11",
"device"=>"Mac"
}

Session start and end time decode for Session events (date{})

Similar to the Timestamp field, the session start and end times are decoded from the [event][payloadData][startTimestamp] and [event][payloadData][terminationTimestamp] with date{} filter plugin. These timestamps are stored as variables named startTime and endTime.

Decode ip address into location for Authentication events (geoip{})

For the Authentication events, we receive the user’s IP address field in [event][payloadData][remoteIp]. This can be decoded into a geo location with the aid of geoip{} filter plugin. This comes with the GeoLite2 database out of the box but can be changed.

The output{} section

After all the filtering actions are done, it’s time to send the modified data to the Elasticsearch. When shipping to Elasticsearch, we need to take the following into consideration,

  • Which Elasticsearch host? → We are shipping to https://localhost:9200.
  • Elasticsearch CA certificate? → Elasticsearch comes with default security features and therefore providing the CA certificate is a must. It can be located at <ELASTICSEARCH_HOME>/config/certs/http_ca.crt.
  • Username and password for Elasticsearch basic authentication → These parameters can be obtained while installing the Elasticsearch or you can add a new user as well.
  • Index → Which index we are going to send this particular log event. Since we already decoded the eventType field from the log, we can have a condition to check the event type and ship to indices wso2-iam-auth-raw or wso2-iam-session-raw.
  • Document id → Since we already get an id for each event from the WSO2 Identity Server, it’s safe to use that as the document id. This helps in debugging purposes as well.

Well, that’s all for the Logstash component. All we have to do now is visualising the data stored in Elasticsearch indices with Kibana. Let’s meet again with the last part of this series. So long… :)

--

--