Creating Complex “Transforms” in ELK (Elastic) Stack for WSO2 Identity Server Events

Avarjana Panditha
5 min readMar 22, 2022

Keep reading, if you are done with the default Kibana interface to create a complex Transform with pivot configuration. If you have already tried out the ELK integration of WSO2 Identity Server (IS) analytics, then you might find this article also useful for creating alerts in elastic stack.

The interface provided by Kibana to create Transforms is good to a certain extent of complexity and then it evolves into a disaster. Let me guide you through, how to write a Transform with multiple groupings, aggregations and a bucket script like a boss. Please check whether you have the following beforehand,

Problems I had,

  1. Interface doesn’t allow bucket scripts.
  2. Doesn’t support two same level aggregations on the same field.
  3. Clears out random fields when running on JSON mode.
  4. Perfectly working queries not providing and results in JSON mode.
  5. Calendar_interval and fixed_interval trouble.

What we have and what we want

We want to create “Suspicious Login Alert” which is somewhat similar to the alert dashboard in WSO2 IS Analytics Solution. The criteria for an Alert to occur is as follows,

  1. Two or more auth step failures within 5 minutes.
  2. One or more auth step success within that same 5 minutes.
  3. All username, tenantDomain, userStoreDomain, serviceProvider should be same for the logs.
  4. Checking for suspicious activities must take place every 1 minute.

We already have “auth” index with the above mentioned fields and “timestamp” field to check the time intervals. Must consider past records as well since I’m planning to provided a dashboard to see previous suspicious login activities. Let’s dive into the process of getting our desired output under above mentioned conditions.

It’s building time

We are going to use “localhost:9200/_transform/_preview” to check our aggregation queries to see if the expected result can be produced. In order to do so, we have to create the body as follows,

{
"source": {
"index": [
"auth*"
]
},
"dest": {
"index": "alert"
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
},
"pivot": {
||| our main concern is here |||
}
}

Provide source index pattern where you get the records and a destination index to send this transformed record. Sync time is one minute according to our example alert which is measured from the timestamp field. This fullfill my 4th requirement, “Checking for suspicious activities must take place every 1 minute.”

Now we need to fill the pivot part to contain our “group_by” and “aggregations”. According to my 3rd requirement “All username, tenantDomain, userStoreDomain, serviceProvider should be same for the logs.”, We must group the records by,

  • event.payloadData.username.keyword
  • event.payloadData.tenantDomain.keyword
  • event.payloadData.userStoreDomain.keyword
  • event.payloadData.serviceProvider.keyword

Therefore in the pivot section I’m going to add these inside “group_by” section as below,

{
"source": {
"index": [
"auth*"
]
},
"dest": {
"index": "alert"
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"event.payloadData.username": {
"terms": {
"field": "event.payloadData.username.keyword"
}
},
"event.payloadData.tenantDomain": {
"terms": {
"field": "event.payloadData.tenantDomain.keyword"
}
},
"event.payloadData.userStoreDomain": {
"terms": {
"field": "event.payloadData.userStoreDomain.keyword"
}
},
"event.payloadData.serviceProvider": {
"terms": {
"field": "event.payloadData.serviceProvider.keyword"
}
}
}
}
}

Now, all we need to do is to check “authStepSucess” counts with respect to true and false within 5 minute intervals. Time interval config must go inside the “group_by” section but the default option of “timestamp” intervals in Kibana rules section is “calendar_interval” which only supports the interval type such as 1 minute, 1 hour, 1 day etc. Since we need a custom interval of 5 minutes, we have to provide “fixed_interval” instead of “calendar_interval” as follows,

{
"source": {
"index": [
"auth*"
]
},
"dest": {
"index": "alert"
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"event.payloadData.username": {
"terms": {
"field": "event.payloadData.username.keyword"
}
},
"event.payloadData.tenantDomain": {
"terms": {
"field": "event.payloadData.tenantDomain.keyword"
}
},
"event.payloadData.userStoreDomain": {
"terms": {
"field": "event.payloadData.userStoreDomain.keyword"
}
},
"event.payloadData.serviceProvider": {
"terms": {
"field": "event.payloadData.serviceProvider.keyword"
}
},
"@timestamp": {
"date_histogram": {
"field": "
@timestamp",
"fixed_interval": "5m"
}
}

},
"aggregations": { ||| OUR NEXT TARGET ||| }
}
}

Let’s move on to aggregations now. All we need is to count how many “authStepSuccess” are true and how many are false in this grouped sections of 5 minutes. Basic must match filter would do that with names “filter_failed” and “filter_success”. Now we need a bucket script to check our condition.

"is_sus": {
"bucket_script": {
"buckets_path": {
"failed": "filter_failed._count",
"success": "filter_success._count"
},
"script": "if(params.success >= 1 && params.failed >= 2 && params.failed > params.success) return 1; else return 0;"
}
}

The script is self explanatory and matches our requirement 1 and 2 accordingly. Now we can send the request to our preview endpoint (localhost:9200/_transform/_preview) and verify whether it’s working properly. The complete request body is as follows,

{
"source": {
"index": [
"auth*"
],
"runtime_mappings": {
"geoip.location": {
"type": "geo_point"
}
}
},
"dest": {
"index": "alert"
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"event.payloadData.username": {
"terms": {
"field": "event.payloadData.username.keyword"
}
},
"event.payloadData.tenantDomain": {
"terms": {
"field": "event.payloadData.tenantDomain.keyword"
}
},
"event.payloadData.userStoreDomain": {
"terms": {
"field": "event.payloadData.userStoreDomain.keyword"
}
},
"event.payloadData.serviceProvider": {
"terms": {
"field": "event.payloadData.serviceProvider.keyword"
}
},
"@timestamp": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "5m"
}
}
},
"aggregations": {
"filter_success": {
"filter": {
"bool": {
"must": [
{
"match": {
"event.payloadData.authStepSuccess": true
}
}
],
"must_not": [],
"should": []
}
}
},
"filter_failed": {
"filter": {
"bool": {
"must": [
{
"match": {
"event.payloadData.authStepSuccess": false
}
}
],
"must_not": [],
"should": []
}
}
},
"is_sus": {
"bucket_script": {
"buckets_path": {
"failed": "filter_failed._count",
"success": "filter_success._count"
},
"script": "if(params.success >= 1 && params.failed >= 2 && params.failed > params.success) return 1; else return 0;"
}
}
}
}
}

If everything goes according to the plan then you will receive no errors but results in the response. Now we can create the transform by sending the same body in a PUT request to “localhost:9200/_transform/suspicious_logins”. You can use the Alert index to create dashboards afterwards.

Suspicious Login Alert Dashboard

Bonus tip for creating alert rules for connectors

You can open the network tab of a created dashboard and copy the query from the request for any visualization you make.

Sample Request Payload

Then use that query to create a rule for the newly created Alert index 😉 and use a connector to send the alert.

--

--