How to Create Effective Detection Rules?
Learn how Microsoft Sentinel uses analytical rules and KQL queries to detect cybersecurity threats and how to build and maintain them effectively.
Microsoft Sentinel is a SIEM/SOAR system designed to detect cybersecurity incidents amidst normal traffic and resolve the resulting alerts. Data sources can include the organization’s cloud resources (Entra ID, servers, virtual machines, etc.), endpoints, applications, and on-premises systems. How are these threats detected using analysis rules (analytic rules), how are they created, and how can an organization stay ahead of evolving threat actors? In this article, we will go through these topics.
Analysis Rules
Threat detection in Sentinel is performed using analysis rules. A single analysis rule is always created to detect a specific type of event. Since there are many events to monitor, a Sentinel environment naturally has multiple analysis rules active simultaneously.
Technically, analysis rules contain a query written in KQL (Kusto Query Language) that defines what log data to retrieve and the criteria for generating alerts from this data. This query is run against the Log Analytics Workspace underlying Sentinel, which stores all log data processed by Sentinel. In simple terms, a KQL query is a list of criteria for events, and when these criteria are met, the rule generates an alert in the monitoring dashboard.
Example of a Simple Rule:
The goal is to know whether there have been sign-ins to the organization’s Entra ID tenant from outside Finland in the last 14 days. Based on this goal, we create a KQL query that defines the following steps to detect the event:
- Retrieve sign-in logs
- Define the timeframe for the last 14 days
- Filter out successful sign-ins
- Filter out sign-ins originating from Finland
If this query yields results, we can generate an alert in the monitoring view.
Example in code:
SigninLogs
| where TimeGenerated > ago(14d) // Last 14 days
| where Location != "FI" // Location is not Finland
| where ResultType != 0 // Sign-in did not fail (simplified)
The rule looks quite simple because it does not account for other possible “ResultType” values indicating a successful sign-in. It also does not modify the structure of the logs by adding or removing specific log fields from the output.
In addition to the KQL query, analysis rules contain the following features:
- Description of the rule
- Severity of the resulting alert (informational, low, medium, high)
- Logic (the KQL queries)
- Incident investigation steps
- Possible automations
There are three main types of analysis rules:
- Scheduled Analytic Rule: Runs at specific intervals, such as once a day. This is the most common rule type.
- NRT Query Rule (Near Real Time): Runs once a minute, thereby detecting threats in near real-time.
- Microsoft Incident Creation Rule: Allows synchronization of incidents between other Microsoft security products. This rule type is retired if your organization has connected Sentinel to the new Microsoft Defender XDR portal.
Among these rule types, the Scheduled Analytic Rule is by far the most common because NRT rules are limited to a maximum of 50 active rules simultaneously, and Microsoft Incident Creation rules can only synchronize alerts from specific security products to Sentinel.
Next, I will show a more specialized and complex analysis rule from the Microsoft community.
Example of an Advanced Analysis Rule:
This query looks for an unusual spike in the number of Azure Key Vault operations performed by a single IP address. The query uses an anomaly detection algorithm built into KQL to identify situations that deviate from normal behavior (in this case, the volume of operations). A sudden increase in Azure Key Vault usage may indicate an attacker’s automated attempt to steal credentials.
let starttime = 14d;
let timeframe = 1d;
let scorethreshold = 3;
let baselinethreshold = 25;
// Known application (Azure Resource Graph) whose behavior involves a large number of Key Vault operations
// This application is filtered out
let Allowedappid = dynamic(["509e4652-da8d-478d-a730-e9d4a1996ca4"]);
// Operations to monitor
let OperationList = dynamic(["SecretGet", "KeyGet", "VaultGet"]);
// Generate the data in which anomalies will be searched
let TimeSeriesData = AzureDiagnostics
| where TimeGenerated between (startofday(ago(starttime))..startofday(now()))
| where not((identity_claim_appid_g in (Allowedappid)) and OperationName == 'VaultGet')
| where ResourceType =~ "VAULTS" and ResultType =~ "Success"
| where OperationName in (OperationList)
| extend ResultType = column_ifexists("ResultType", "None"), CallerIPAddress = column_ifexists("CallerIPAddress", "None")
| where ResultType !~ "None" and isnotempty(ResultType)
| where CallerIPAddress !~ "None" and isnotempty(CallerIPAddress)
| project TimeGenerated, OperationName, Resource, CallerIPAddress
// KQL built-in function that prepares data for the algorithm
| make-series HourlyCount=count() on TimeGenerated from startofday(ago(starttime)) to startofday(now()) step timeframe by CallerIPAddress;
// Filter based on anomalies
let TimeSeriesAlerts = TimeSeriesData
// "series_decompose_anomalies" is a built-in KQL function that detects anomalies
| extend (anomalies, score, baseline) = series_decompose_anomalies(HourlyCount, scorethreshold, -1, 'linefit')
| mv-expand HourlyCount to typeof(double), TimeGenerated to typeof(datetime), anomalies to typeof(double), score to typeof(double), baseline to typeof(long)
| where anomalies > 0 | extend AnomalyHour = TimeGenerated
| where baseline > baselinethreshold // Filter to include only high volumes per baselinethreshold
| project CallerIPAddress, AnomalyHour, TimeGenerated, HourlyCount, baseline, anomalies, score;
let AnomalyHours = TimeSeriesAlerts | where TimeGenerated > ago(2d) | project TimeGenerated;
// Select alerts from a specific timeframe
TimeSeriesAlerts
| where TimeGenerated > ago(2d)
// Join with "normal" logs to display logs surrounding the anomaly
| join kind = innerunique (
AzureDiagnostics
| where TimeGenerated > ago(2d)
| where not((identity_claim_appid_g in (Allowedappid)) and OperationName == 'VaultGet')
| where ResourceType =~ "VAULTS" and ResultType =~ "Success"
| where OperationName in (OperationList)
| extend DateHour = bin(TimeGenerated, 1h)
| where DateHour in ((AnomalyHours))
| extend ResultType = column_ifexists("ResultType", "NoResultType")
| extend requestUri_s = column_ifexists("requestUri_s", "None"), identity_claim_http_schemas_microsoft_com_identity_claims_objectidentifier_g = column_ifexists("identity_claim_http_schemas_microsoft_com_identity_claims_objectidentifier_g", "None"), identity_claim_oid_g = column_ifexists("identity_claim_oid_g", ""), identity_claim_upn_s = column_ifexists("identity_claim_upn_s", "")
| extend CallerObjectId = iff(isempty(identity_claim_oid_g), identity_claim_http_schemas_microsoft_com_identity_claims_objectidentifier_g, identity_claim_oid_g), CallerObjectUPN = iff(isempty(identity_claim_upn_s), identity_claim_http_schemas_xmlsoap_org_ws_2005_05_identity_claims_upn_s, identity_claim_upn_s)
| extend id_s = column_ifexists("id_s", "None"), CallerIPAddress = column_ifexists("CallerIPAddress", "None"), clientInfo_s = column_ifexists("clientInfo_s", "None")
| summarize PerOperationCount=count(), LatestAnomalyTime = arg_max(TimeGenerated,*) by bin(TimeGenerated,1h), Resource, OperationName, id_s, CallerIPAddress, identity_claim_http_schemas_microsoft_com_identity_claims_objectidentifier_g, identity_claim_oid_g, requestUri_s, clientInfo_s
) on CallerIPAddress
| extend CallerObjectId = iff(isempty(identity_claim_oid_g), identity_claim_http_schemas_microsoft_com_identity_claims_objectidentifier_g, identity_claim_oid_g), CallerObjectUPN = iff(isempty(identity_claim_upn_s), identity_claim_http_schemas_xmlsoap_org_ws_2005_05_identity_claims_upn_s, identity_claim_upn_s)
| summarize EventCount=count(), OperationNameList = make_set(OperationName,1000), RequestURLList = make_set(requestUri_s, 100), AccountList = make_set(CallerObjectId, 100), AccountMax = arg_max(CallerObjectId,*) by Resource, id_s, clientInfo_s, LatestAnomalyTime
| extend timestamp = LatestAnomalyTime
Phew! That is quite a bit of code. Writing queries in KQL requires structured processes to successfully extract the most relevant events from query results.
The code snippet above originates from the Microsoft community, where many pre-built analysis rules are publicly available. Here is the link.
Rules created by the community usually require tuning and polishing when integrated into a production system.
The most important goal of KQL queries is to clarify:
- Does a threat exist?
- Where does the threat exist?
- Why does the threat exist?
This information can be uncovered even with relatively simple queries.
Creating Analysis Rules Threat-First
We have covered the anatomy of analysis rules, but it is also important to understand how to design effective and operational rules. There are countless processes for creating analysis rules, but generally they follow a workflow like this:
- Define the scenario or threat model.
- Define what needs to be detected and which data sources contain the required logs.
- Create the KQL query and the analysis rule.
- Test the rule’s functionality.
- Fine-tune the rule based on alerts (for example, filtering out allowed users).
Best practices include threat-led development of rules, which can be driven by business requirements, cyberattack prevention, or utilizing threat intelligence.
Business Requirements
Business requirements may involve events whose detection is critical for business operations. Based on these events, automated reports can be generated for management to help develop business operations. Here are a few examples:
- User monitoring (external users, service accounts)
- Compliance and regulation (sign-ins from abroad, MFA usage, administrator activities)
- Application usage
Cyberattack Prevention
Threat models are created for cyberattacks, and the events associated with them are identified—such as the creation of new processes, downloading malware from the internet, and privilege modifications.
A few examples of events commonly linked to cyberattacks:
- Sudden spikes in sign-in attempts (Brute-force attacks / MFA fatigue)
- Creation of suspicious processes
- Privilege escalation
- Creation and deletion of new users
- Sign-in attempts to the same account from multiple IP addresses
- Downloading executable files from the internet (example below)
This KQL query detects executable files downloaded via HTTP requests, which may point to automated malware activity.
let ExecutableFileExtentions = dynamic(['bat', 'cmd', 'com', 'cpl', 'ex', 'exe', 'jse', 'lnk', 'msc', 'ps1', 'reg', 'vb', 'vbe', 'ws', 'wsf']);
DeviceNetworkEvents
| where ActionType == "NetworkSignatureInspected"
| extend SignatureName = tostring(parse_json(AdditionalFields).SignatureName),
SignatureMatchedContent = tostring(parse_json(AdditionalFields).SignatureMatchedContent),
SamplePacketContent = tostring(parse_json(AdditionalFields).SamplePacketContent)
| where SignatureName == "HTTP_Client"
| extend HTTP_Request_Method = tostring(split(SignatureMatchedContent, " /", 0)[0])
| where HTTP_Request_Method == "GET"
| extend DownloadedContent = extract(@'.*/(.*)HTTP', 1, SignatureMatchedContent)
| extend DownloadContentFileExtention = extract(@'.*\.(.*)$', 1, DownloadedContent)
| where isnotempty(DownloadContentFileExtention) and string_size(DownloadContentFileExtention) < 8
| where DownloadContentFileExtention has_any (ExecutableFileExtentions)
| project-reorder TimeGenerated, DeviceName, DownloadedContent, HTTP_Request_Method, RemoteIP
Source: Bert-JanP/Hunting-Queries-Detection-Rules
Utilizing Threat Intelligence
Collecting threat intelligence from open sources is the best way to keep pace with new cyber threats. Watchlists of threat data can be created in Sentinel and easily utilized within KQL queries. Threat intelligence feeds can be purchased from Microsoft (Microsoft Threat Intelligence) or third parties (hint hint, Tekve Oy).
Example:
This KQL query uses the DeviceFileEvents table (endpoint file events) to look for file identifiers linked to threat intelligence. In this example, we use file hashes of the “FormBook” malware, which was reported in the news last week, as threat intelligence.
let dt_lookBack = 1h;
// Watchlist containing FormBook malware file hashes in CSV format
let FormBookHashes = (_GetWatchlist("FormBookFileHashes") | project fileHash);
let DeviceFileEvents_ = (union
(DeviceFileEvents | where TimeGenerated > ago(dt_lookBack) | where isnotempty(SHA1) | extend FileHashValue = SHA1),
(DeviceFileEvents | where TimeGenerated > ago(dt_lookBack) | where isnotempty(SHA256) | extend FileHashValue = SHA256)
);
DeviceFileEvents_
| where FileHashValue in (FormBookHashes)
Continuous Development
Many organizations deploy Microsoft Sentinel and think the main work is done, neglecting its further development. This is a major mistake, as cyber threats evolve daily and find new ways to breach environments. Therefore, it is critical to continuously maintain detection rules and Indicator of Compromise (IOC) databases. Without this maintenance, Sentinel’s effectiveness degrades.
It is also important to establish processes within the organization to ensure that Sentinel alerts are actively monitored and followed up on. Tekve Oy offers a cost-effective monthly service to keep your Sentinel environment optimized, including updating analysis rules and threat intelligence to match evolving threat actors.