11 February, 2024

Using Fluent Bit to send CommonSecurityLog data to Sentinel

One of the limitations of Microsoft Sentinel and Azure Log Analytics was that if you wanted to use a custom log forwarder or log source, you couldn't write to the built-in tables. You had to create Custom tables, which had a _CL suffix.

This was a problem, because each custom table you added had to be manually added to your Analytic rules and Workbooks, creating stupid inefficiencies.

But this is no more! With the new Logs Ingestion API, Microsoft supports custom data being sent to 4 built-in tables: CommonSecurityLog, SecurityEvents, Syslog, WindowsEvents.

In this example, we're going to be sending firewall log data from OpenWrt to the standard CommonSecurityLog table, so that we get the benefit of having normalised data, and the built-in Analytic templates.

Overview of data flow, from OpenWrt, through Fluent Bit, to a Log Analytics workspace through DCE and DCR

To set this up, we're going to:

  1. Identify the data we want to collect and mapping it to CommonSecurityLog
  2. Create an App Registration
  3. Create a Data Collection Endpoint (DCE)
  4. Create a Data Collection Rule (DCR)
  5. Assign IAM Roles to the DCR
  6. Install Fluent Bit
  7. Configure a Fluent Bit parser for OpenWrt firewall logs
  8. Create a Fluent Bit config

Mapping iptables to CommonSecurityLog

The first step in ingesting any data into any SIEM is to know what the data is, what you're interested in, and how to normalise it so that you can make use of it. Otherwise, you're just sending garbage into a very expensive garbage pile.

Example OpenWrt iptables logs look like:

[000000.000000] drop wan invalid ct state: IN= OUT=pppoe-wan SRC=203.0.113.42 DST=198.51.100.9 LEN=40 TOS=0x00 PREC=0x00 TTL=64 ID=0 DF PROTO=TCP SPT=52182 DPT=443 WINDOW=0 RES=0x00 RST URGP=0
[000000.000000] drop wan invalid ct state: IN=eth1 OUT=pppoe-wan MAC=aa:aa:aa:aa:aa:aa:bb:bb:bb:bb:bb:bb:08:00 SRC=198.51.100.134 DST=198.51.100.29 LEN=40 TOS=0x00 PREC=0x00 TTL=127 ID=45489 DF PROTO=TCP SPT=56913 DPT=443 WINDOW=0 RES=0x00 ACK RST URGP=0
[000000.000000] reject wan forward: IN=pppoe-wan OUT=eth1 MAC= SRC=2001:0db8:0000:0000:0000:0000:ffff:0002 DST=2001:0db8:aaaa:aaaa:aaaa:aaaa:aaaa:af3c LEN=60 TC=0 HOPLIMIT=57 FLOWLBL=864951 PROTO=TCP SPT=443 DPT=53714 WINDOW=0 RES=0x00 RST URGP=0
[000000.000000] reject wan in: IN=pppoe-wan OUT= MAC= SRC=198.51.100.130 DST=203.0.113.42 LEN=52 TOS=0x00 PREC=0x00 TTL=113 ID=19525 DF PROTO=TCP SPT=53501 DPT=1433 WINDOW=8192 RES=0x00 SYN URGP=0
[000000.000000] reject wan in: IN=pppoe-wan OUT= MAC= SRC=198.51.100.130 DST=203.0.113.42 LEN=52 TOS=0x00 PREC=0x00 TTL=12 ID=31743 PROTO=UDP SPT=12054 DPT=53 LEN=32
[000000.000000] reject wan out: IN=eth1 OUT=pppoe-wan MAC=aa:aa:aa:aa:aa:aa:bb:bb:bb:bb:bb:bb:08:00 SRC=198.51.100.233 DST=198.51.100.1 LEN=84 TOS=0x00 PREC=0x00 TTL=63 ID=15546 PROTO=ICMP TYPE=8 CODE=0 ID=2828 SEQ=0
[000000.000000] accept wan out: IN=eth1.30 OUT=pppoe-wan MAC= SRC=198.51.100.130 DST=203.0.113.42 LEN=52 TOS=0x00 PREC=0x00 TTL=12 ID=31893 PROTO=UDP SPT=24912 DPT=53 LEN=32

Based on these logs, and what I'm interested in, I'm going to normalise the data to the CommonSecurityLog schema using the following fields:

CommonSecurityLog datatype iptables fluent-bit
TimeGenerated datetime time
CommunicationDirection (in or out)
Computer host
DestinationIP DST=
DestinationMACAddress MAC=
DestinationPort int DPT=
DeviceAction (accept, reject, or drop)
DeviceCustomString1 (rule interface)
DeviceInboundInterface IN=
DeviceOutboundInterface OUT=
ProcessName ident
Protocol PROTO=
ReceiptTime time
ReceivedBytes long LEN=
SourceIP SRC=
SourceMACAddress MAC=
SourcePort int SPT=
Message message

Note: sharp readers will notice I'm keeping the original message field, which effectively duplicates all the relevant data... this is just temporary, as I haven't fully finished my regex filter. At some point in the future, I'll remove the raw message, otherwise I'm just doubling my data costs.

Create an App Registration

Unlike the old Data Collector API, which used a static key for the entire workspace, the new Log Ingestion API uses an OAuth flow, so that you can be quite granular about what sources are allowed to send what data to what tables.

This means that we'll need to create an App Registration for Fluent Bit.

  • Go to https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/CreateApplicationBlade/quickStartType~/null/isMSAApp~/false
  • Enter a name like fluent, leave the Redirect URI blank, and click Register.
  • Once created, on the Overview page, make a note of the Application (client) ID and Directory (tenant) ID.
  • Go to Owners, and set yourself as an Owner. This is good operational practice (so that in large enterprises, there's some clue about who manages what), and it means that you'll always be able to create a new Secret, even if you lose your admin rights.
  • Go to Certificates & secrets, and create a new client Secret. Don't forget to add a calendar reminder for the Expiry date, and make a note of the new secret.

Create a Data Collection Endpoint (DCE)

We're also going to need a Data Collection Endpoint (DCE). The DCE is the public URL that will accept data. We can reuse an existing DCE, provided that:

  • it's in the same Location (region) as the destination Log Analytics workspace.
  • your log source can talk to it, either over the public internet, or a Private Link scope.

To create one,

Create a DCR to send data to CommonSecurityLog

The Data Collection Rule (DCR) is used to glue together the ingest pipeline. A DCR:

  • is linked to a specific DCE;
  • contains one or more Streams, which define what input data columns and types to allow;
  • contains one or more Destinations, the Log Analytics workspaces;
  • contains one or more Flows, which map a Stream to a Destination via a Transform Rule; and
  • contains Azure IAM Roles assigned to App Registrations / Service Principals / Managed Identities, so that ingest can be controlled.

Or, put visually:

DCR flow overview

You can't (currently) use the Azure Portal to create a DCR that sends custom data to the built-in tables. For this step, we need to create an ARM Template.

To make this bit easy for you, I've prepared the ARM template below in a nice tidy Deploy to Azure button:

Deploy to Azure

What this ARM template does is:

  • Create a Custom-CommonSecurityLogStream stream. This is important: the stream must contain the fields we've mapped above because any extra fields will simply be discarded.
    • This includes fields generated/mapped by Fluent Bit, like TimeGenerated and Computer; and fields extracted through the parser, like DestinationIP and SourcePort.
  • Maps this stream to the output table Microsoft-CommonSecurityLog. Unlike custom tables, which have a Custom- prefix, the built-in tables use the Microsoft- prefix. This template uses a transformKql of "source": in other words, it won't transform the source data, just passing it through as-is.

In this case, I'm using the Custom-CommonSecurityLogStream stream name; you'll need this later.

Once you create the DCR, you'll also need to get the immutable ID (looks like dcr-00000000000000000000000000000000). If you view the DCR in the Azure Portal, click JSON View.

{
    "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "dataCollectionRuleName": {
            "type": "string",
            "metadata": {
                "description": "Specifies the name of the Data Collection Rule to create."
            }
        },
        "location": {
            "defaultValue": "[resourceGroup().location]", 
            "type": "string", 
            "metadata": {
                "description": "Specifies the location in which to create the Data Collection Rule." 
            } 
        },
        "workspaceResourceId": {
            "type": "string",
            "metadata": {
                "description": "Specifies the Azure resource ID of the Log Analytics workspace to use."
            }
        },
        "endpointResourceId": {
            "type": "string",
            "metadata": {
                "description": "Specifies the Azure resource ID of the Data Collection Endpoint to use."
            }
        }
    },
    "resources": [
        {
            "type": "Microsoft.Insights/dataCollectionRules",
            "apiVersion": "2021-09-01-preview",
            "name": "[parameters('dataCollectionRuleName')]",
            "location": "[parameters('location')]",
            "properties": {
                "dataCollectionEndpointId": "[parameters('endpointResourceId')]",
                "streamDeclarations": {
                    "Custom-CommonSecurityLogStream": {
                        "columns": [
                            {
                                "name": "TimeGenerated",
                                "type": "datetime"
                            },
                            {
                                "name": "CommunicationDirection",
                                "type": "string"
                            },
                            {
                                "name": "Computer",
                                "type": "string"
                            },
                            {
                                "name": "DestinationIP",
                                "type": "string"
                            },
                            {
                                "name": "DestinationPort",
                                "type": "int"
                            },
                            {
                                "name": "DeviceAction",
                                "type": "string"
                            },
                            {
                                "name": "DeviceCustomString1",
                                "type": "string"
                            },
                            {
                                "name": "DeviceInboundInterface",
                                "type": "string"
                            },
                            {
                                "name": "ProcessName",
                                "type": "string"
                            },
                            {
                                "name": "Protocol",
                                "type": "string"
                            },
                            {
                                "name": "ReceiptTime",
                                "type": "string"
                            },
                            {
                                "name": "ReceivedBytes",
                                "type": "long"
                            },
                            {
                                "name": "SourceIP",
                                "type": "string"
                            },
                            {
                                "name": "SourcePort",
                                "type": "int"
                            },
                            {
                                "name": "Message",
                                "type": "string"
                            }
                        ]
                    }
                },
                "destinations": {
                    "logAnalytics": [
                        {
                            "workspaceResourceId": "[parameters('workspaceResourceId')]",
                            "name": "clv2ws1"
                        }
                    ]
                },
                "dataFlows": [
                    {
                    "streams": [
                        "Custom-CommonSecurityLogStream"
                    ],
                    "destinations": [
                        "clv2ws1"
                    ],
                    "transformKql": "source",
                        "outputStream": "Microsoft-CommonSecurityLog"
                    }
                ]
            }
        }
    ],
    "outputs": {
        "dataCollectionRuleId": {
            "type": "string",
            "value": "[resourceId('Microsoft.Insights/dataCollectionRules', parameters('dataCollectionRuleName'))]"
        }
    }
}

Note: You may get a warning along the lines of Value is not accepted. Valid values: "Microsoft-Event", "Microsoft-InsightsMetrics", "Microsoft-Perf", "Microsoft-Syslog", "Microsoft-WindowsEvent".. You can safely ignore this - it's just a validation warning because Microsoft haven't updated the JSON schema yet.

Assign IAM Roles to the DCR

Once you've created the DCR, the final step is to assign an IAM Role to the App Registration we created before.

In the Azure Portal, go to the DCR, click Access control (IAM), and Add role assignment. You want to assign Monitoring Metrics Publisher to the App Registration.

Install Fluent Bit

There are lots of ways to install Fluent Bit... just go read the docs at https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit 😉

Configure a Fluent Bit parser for OpenWrt firewall logs

Based on our mapping, we're going to need to use a regular expression to parse the OpenWrt iptables logs, and tag them with the appropriate column names.

Instead of modifying the existing Fluent Bit parsers configuration, I simply create a new parsers.conf file:

# Based on https://github.com/fluent/fluent-bit/blob/master/conf/parsers_extra.conf

[PARSER]
    Name         iptables-openwrt
    Format       regex
    Regex        (?<DeviceAction>reject|accept|drop) (?<DeviceCustomString1>.*?) (?<CommunicationDirection>[\w\s]+): IN=(?<DeviceInboundInterface>[\w\-\.]+)? OUT=(?<DeviceOutboundInterface>[\w\-\.]+)?( MAC=((?<DestinationMACAddress>\w\w:\w\w:\w\w:\w\w:\w\w:\w\w):(?<SourceMACAddress>\w\w:\w\w:\w\w:\w\w:\w\w:\w\w):\w\w:\w\w)?)? SRC=(?<SourceIP>[\w\.\:]+) DST=(?<DestinationIP>[\w\.\:]+) LEN=(?<ReceivedBytes>\d+) .* PROTO=(?<Protocol>[\w\d]+)( SPT=(?<SourcePort>\d+) DPT=(?<DestinationPort>\d+))?
    Types        SourcePort:integer,DestinationPort:integer,ReceivedBytes:integer

#
# Built-in, from https://github.com/fluent/fluent-bit/blob/master/conf/parsers.conf
#

[PARSER]
    Name        syslog-rfc3164
    Format      regex
    Regex       /^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/
    Time_Key    time
    Time_Format %b %d %H:%M:%S
    Time_Keep   On

Create a Fluent Bit config

Next, we'll need to create a syslog input (to receive data from OpenWrt), parse it with the RFC3164 Parser, rename a bunch of the columns in line with the mapping we did, and then finally send them to Log Analytics via the azure_logs_ingestion output.

Note: stream_name is not currently part of the azure_logs_ingestion plugin, but is needed to map to our DCR... I've created a PR, which is currently pending approval.

Tying it all together, my fluent-bit.conf looks like:

[SERVICE]
    Parsers_File /usr/local/etc/fluent-bit/parsers.conf
    Flush 1
    Log_Level info

[INPUT]
    Name syslog
    Mode udp
    Port 1514
    Parser syslog-rfc3164
    Tag  syslog.openwrt

[FILTER]
    Name modify
    Match syslog.*
    Rename host Computer
    Rename ident ProcessName
    Rename message Message
    Remove pid
    Remove pri
    Rename time ReceiptTime

[FILTER]
    Name parser
    Match syslog.openwrt
    Key_Name Message
    Parser iptables-openwrt
    Preserve_Key On
    Reserve_Data On

[OUTPUT]
    Name            azure_logs_ingestion
    Match           syslog.openwrt
    client_id       00000000-0000-0000-0000-000000000000
    client_secret   00000~0000000000000.0000000000000000-000
    tenant_id       00000000-0000-0000-0000-000000000000
    dce_url         https://example-xxxx.westus2-1.ingest.monitor.azure.com
    dcr_id          dcr-00000000000000000000000000000000
    table_name      CommonSecurityLog
    stream_name     Custom-CommonSecurityLogStream
    time_generated  true
    time_key        TimeGenerated
    Compress        true

Run this with fluent-bit -c fluent-bit.conf, and you should see your logs flowing into Sentinel in near-real-time! 🎉