Grok Filter for Syslog entries

Announcements

ATTENTION Customers, All Partners and Employees: The Customer Support Portal (CSP) will be undergoing maintenance and unavailable on Saturday, November 7, 2020, from 11 am to 11 pm PST. Please read our blog for more information.

Reply
Highlighted
L1 Bithead

Grok Filter for Syslog entries

Does anyone have a Grok filter compatible with Cortex XDR syslog entries?

 

I'm piping Cortex XDR syslog into logstash and then through to Elasticsearch for parsing & alerting, but there seems to be two nested log formats. One pipe-separate and then inside that a space-separated list of fields (including some values that themselves contains spaces and aren't quote-encapsulated)

 

Looks tricky to pick apart, so I was wondering if anyone else has already had a shot at this

 

Example data (sanitised) showing the initial pipe-separated data;

<9>1 YYYY-MM-DDTHH:MM:SS.SSSSSZ cortexxdr - - - - CEF:0|Palo Alto Networks|Cortex XDR|Cortex XDR 2.4|XDR Agent|WildFire Malware|8|end=<timestamp> shost=<host> suser=['DOMAIN\\\\user'] deviceFacility=None cat=Malware externalId=356 request=https://<tenant>.xdr.paloaltonetworks.com/alerts/356 cs1=wildfire-test-pe-file.exe cs1Label=Initiated by cs2="C:\\<Path>\\<To>\\<File>\\wildfire-test-pe-file.exe" cs2Label=Initiator CMD cs3=NoneSIGNATURE_UNAVAILABLE- cs3Label=Signature cs4Label=CGO name cs5Label=CGO CMD cs6=NoneSIGNATURE_UNAVAILABLE- cs6Label=CGO Signature fileHash=322F12006FF2939D6BEE35398FFCD5F2C370446F34999D4A93FC498A849EF386 filePath=C:\\<Path>\\<To>\\<File>\\wildfire-test-pe-file.exe targetprocesssignature=NoneSIGNATURE_UNAVAILABLE- tenantname=<tenant> - Cortex XDR tenantCDLid=926048619 CSPaccountname=<Company> initiatorSha256=322F12006FF393946E5E33398CFCD532C3704C6F34999D4A93FC498A849EF386 initiatorPath=C:\\<Path>\\<To>\\<File>\\wildfire-test-pe-file.exe osParentSignature=SIGNATURE_UNAVAILABLE incident=47 act=Prevented (Blocked)

Highlighted
L1 Bithead

It's formatted using CEF.  Logstash has a CEF codec plugin but it mostly just rewrites the keys or field names.  Any items withe the string "Label" in the key are in fact providing the key name for the related non-label key=value
So:

cs6Label=Pants
cs6=True

 

Can actually be consolidated to

Pants: True

 

I recommend checking out nxlog as it has a very straight forward CEF->JSON conversion that would allow you to feed in json to logstash and hit the ground running.  

 

Then you could construct a bunch of mutate filters like below to consolidate those fields.....

 

# Match labels to values and remove other fields
if([deviceCustomString1] and [deviceCustomString1Label]) {
mutate {
add_field => [ "%{deviceCustomString1Label}","%{deviceCustomString1}"]
remove_field => ["deviceCustomString1Label"]
remove_field => ["deviceCustomString1"]
}
}

if([deviceCustomString2] and [deviceCustomString2Label]) {
mutate {
add_field => [ "%{deviceCustomString2Label}","%{deviceCustomString2}"]
remove_field => ["deviceCustomString2Label"]
remove_field => ["deviceCustomString2"]
}
}

if([deviceCustomString3] and [deviceCustomString3Label]) {
mutate {
add_field => [ "%{deviceCustomString3Label}","%{deviceCustomString3}"]
remove_field => ["deviceCustomString3Label"]
remove_field => ["deviceCustomString3"]
}
}
if([deviceCustomString4] and [deviceCustomString4Label]) {
mutate {
add_field => [ "%{deviceCustomString4Label}","%{deviceCustomString4}"]
remove_field => ["deviceCustomString4Label"]
remove_field => ["deviceCustomString4"]
}
}

if([deviceCustomString5] and [deviceCustomString5Label]) {
mutate {
add_field => [ "%{deviceCustomString5Label}","%{deviceCustomString5}"]
remove_field => ["deviceCustomString5Label"]
remove_field => ["deviceCustomString5"]
}
}

if([deviceCustomString6] and [deviceCustomString6Label]) {
mutate {
add_field => [ "%{deviceCustomString6Label}","%{deviceCustomString6}"]
remove_field => ["deviceCustomString6Label"]
remove_field => ["deviceCustomString6"]
}
}
if([deviceCustomNumber1Label] and [deviceCustomNumber1]) {
mutate {
add_field => [ "%{deviceCustomNumber1Label}","%{deviceCustomNumber1}"]
remove_field => ["deviceCustomNumber1Label"]
remove_field => ["deviceCustomNumber1"]
}
}
if([deviceCustomNumber2Label] and [deviceCustomNumber2]) {
mutate {
add_field => [ "%{deviceCustomNumber2Label}","%{deviceCustomNumber2}"]
remove_field => ["deviceCustomNumber2Label"]
remove_field => ["deviceCustomNumber2"]
}
}
if([deviceCustomNumber3Label] and [deviceCustomNumber3]) {
mutate {
add_field => [ "%{deviceCustomNumber3Label}","%{deviceCustomNumber3}"]
remove_field => ["deviceCustomNumber3Label"]
remove_field => ["deviceCustomNumber3"]
}
}
if([deviceCustomNumber4Label] and [deviceCustomNumber4]) {
mutate {
add_field => [ "%{deviceCustomNumber4Label}","%{deviceCustomNumber4}"]
remove_field => ["deviceCustomNumber4Label"]
remove_field => ["deviceCustomNumber4"]
}
}
if([deviceCustomNumber5Label] and [deviceCustomNumber5]) {
mutate {
add_field => [ "%{deviceCustomNumber5Label}","%{deviceCustomNumber5}"]
remove_field => ["deviceCustomNumber5Label"]
remove_field => ["deviceCustomNumber5"]
}
}
if([deviceCustomNumber6Label] and [deviceCustomNumber6]) {
mutate {
add_field => [ "%{deviceCustomNumber6Label}","%{deviceCustomNumber6}"]
remove_field => ["deviceCustomNumber6Label"]
remove_field => ["deviceCustomNumber6"]
}
}
if([flexNumber1Label] and [flexNumber1]) {
mutate {
add_field => [ "%{flexNumber1Label}","%{flexNumber1}"]
remove_field => ["flexNumber1Label"]
remove_field => ["flexNumber1"]
}
}
if([flexNumber2Label] and [flexNumber2]) {
mutate {
add_field => [ "%{flexNumber2Label}","%{flexNumber2}"]
remove_field => ["flexNumber2Label"]
remove_field => ["flexNumber2"]
}
}
if([flexNumber3Label] and [flexNumber3]) {
mutate {
add_field => [ "%{flexNumber3Label}","%{flexNumber3}"]
remove_field => ["flexNumber3Label"]
remove_field => ["flexNumber3"]
}
}
if([flexNumber4Label] and [flexNumber4]) {
mutate {
add_field => [ "%{flexNumber4Label}","%{flexNumber4}"]
remove_field => ["flexNumber4Label"]
remove_field => ["flexNumber4"]
}
}

if([flexNumber5Label] and [flexNumber5]) {
mutate {
add_field => [ "%{flexNumber5Label}","%{flexNumber5}"]
remove_field => ["flexNumber5Label"]
remove_field => ["flexNumber5"]
}
}

Highlighted
L1 Bithead

 Here is an example nxlog excerpt to convert cef to json.

 

<Input xdr_cef>
Module im_file
File "/var/log/xdr*log*"
Exclude "/var/log/xdr*bz2"
Exec parse_syslog(); parse_cef($Message);
</Input>

 

<Output xdr_json>
Module om_file
File '/var/log/xdr' + strftime($EventTime, '%Y-%m-%d') + '.json'
Exec to_json();
</Output>


<Route main>
Path xdr_cef => xdr_json
</Route>

Highlighted
L1 Bithead

Thanks, that's useful info

 

My actual issue was that the log message seemed to have multiple formats;

- First some content that was space-delimited;
- Then some content that was pipe-delimited;
- Then some key-value content

 

Maybe that's standard for CEF?

 

In any case, this is the Logstash filter I used which seems to have done a good job of pulling it all apart;

filter {
grok {
match => { "message" => "<%{NUMBER:xdr.system}>%{NUMBER:xdr.event} %{TIMESTAMP_ISO8601:xdr.timestamp} %{DATA:xdr.source} %{DATA:xdr.e1} %{DATA:xdr.e2} %{DATA:xdr.e3} %{DATA:xdr.e4} %{DATA:xdr.format}\|%{DATA:xdr.company}\|%{DATA:xdr.platform}\|%{DATA:client.version}\|%{DATA:client.system}\|%{DATA:alert.title}\|%{INT:xdr.unknown}\|%{GREEDYDATA:alert.message}" }
}
kv {
source => "alert.message"
target => "data"
}

}

 

Highlighted
L1 Bithead

You are correct in that everything starting with and after the string CEF is the Arcsight CEF format.  The portion prior to that is a syslog header.   If you use the current stable filebeat it includes native CEF parsing to ECS. 

 

 I have to say I was quite surprised at PAN's selection of CEF for the syslog messages but JSON for the email alerts.   It also surprised me that their were far fewer fields provided in the CEF messages than the JSON emails (albeit a ton of fields are null),

Highlighted
L1 Bithead

I suspect the JSON format was chosen for it's flexibility, as Cortex alerts come in multiple types (Malware [both Local and Wildfire], Firewall/Strata, Analytics (e.g. Recurring Rare IP Address or Large Upload). Each type has it's own unique set of fields - you get source & dest port in Firewall log alerts, but not in Local Malware, for example

 

Keeping the CEF format makes sense as I'm sure PANW has many 'Enterprise' customers running centralised logging servers that incorporate logs from multiple vendors. To cover that situation they'd want to be able to 'fall back' to something standardised such as CEF. There are also multiple logging formats available for syslog servers, although I haven't tested out the 'legacy' format yet to see how different it is

Like what you see?

Show your appreciation!

Click Like if a post is helpful to you or if you just want to show your support.

Click Accept as Solution to acknowledge that the answer to your question has been provided.

The button appears next to the replies on topics you’ve started. The member who gave the solution and all future visitors to this topic will appreciate it!

These simple actions take just seconds of your time, but go a long way in showing appreciation for community members and the Live Community as a whole!

The Live Community thanks you for your participation!