stdlib.localLogStash - How to get mined data into logstash

Reply
L1 Bithead

stdlib.localLogStash - How to get mined data into logstash

Hello Team,

 

I have set up few IP miners and then processing through a inbound aggregator and then giving it to Logstash output,

Names have been customised as I have cloned the aggregator and output from existing prototypes.

image

I am not seeing any indicators in DARP-output [stdlib.localLogStash] . is it normal ? and can some one help me with logstash configuration for getting these indicators into a file.

i tried this logstash config  but no success. dont know what i am missing.

 

#logstash configuration:
input {
tcp {
port => 5514
}
}

output {
file {
path =>/etc/logstash/minemeld-output.txt
}
}

 

Immight be missing something from minemeld or logstash end.  kind help is much appreciated.

L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Hello Community - Can someone help me with my above query. 

what is the best way used for getting minemeld data into elasticsearch.  ? 


L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Hey,

 

I've yet to dig deeper but I've managed to get this working and feed into Elasticsearch via Logstash.

 

Everything you've got setup looks fine however one thing I found is that as soon as the output is set up, it will send the bulk of the output therefore if you've not yet set up the logstash listener, then you will miss the events come into it.

 

I had to remove and re-add the output so I could pick up the indicators.

 

Once added, I was able to see the data however everything was in the 'message' field in a JSON format so added in a filter within logstash as below:

 

filter {

  # Tag minemeld events
  if "@origin" in [message] {
    mutate {
      add_tag => "minemeld"
    }
    json {
      source => "message"
    }
  }

} 

 

Let us know if you get this working as I can't see much info on this either.

 

Cheers,

Kev

L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Thanks kev.   I deleted logstash output in minemeld and reconfigured. 

 

But I get Unrecognized SSL message error while running logstash.  is it because that I configured self signed cert in minemeld server ??

can you help me fixing this. 

L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

You are awesom @KevinAS    - Thank you.  I am successfully getting minemeld IOC into elasticsearch. 

 

can you help me with one more query.  Currently the indicators are getting parsed as range types.  how can we get it as a single IP range.   Is it something we can configure in minemeld  or is there a way to trim the indicator field in logstash ? 

 

your help would be really useful. 

 

ip range.JPG

L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Hey,

Glad to hear you got it working!

I'm working on the same problem. Ideally I'm looking to get this in a CIDR format but unsure on where to configure this whether it be on the processor stage or the logstash output. Also I'm trying to achieve this at the minemeld stage instead of using logstash filters.

If anyone else can help on this part, would be helping us both out.

In the meantime, I'll keep looking.

Cheers,
Kev
L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Previous post got removed, not sure where it went.

 

Managed to get this working after some additional lines in one of the files. Not the greatest fix but it does the job for now.

 

Had to edit the following logstash.py file (path dependent on install location)

/opt/minemeld/engine/core/minemeld/ft/logstash.py

Need to add an import into the file

 

Before:

import logging
import ujson
import datetime
import socket

After:

import logging
import ujson
import datetime
import socket
import netaddr

Then some additional lines under the indicators section

 

Before:

if indicator is not None:
    fields['@indicator'] = indicator

After:

if indicator is not None:
    fields['@indicator'] = indicator
    if len(indicator.split('-')) == 2:
        startIP = indicator.split('-')[0]
        endIP = indicator.split('-')[1]
        cidr = netaddr.iprange_to_cidrs(startIP, endIP)[0]
        fields['CIDR'] = str(cidr)

This will produce a new field called 'CIDR'.

 

If anyone knows an alternative/cleaner method to achieve this, please share!

 

Hope this helps.

 

Cheers,

Kev

L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Thank you so much @KevinAS .   I Tried trimming the format in @indicator field.  

 

I used Dissect mapping from logstash.  [ Thanks to logstash community for suggestions]

 

dissect { mapping => { "@indicator" => "%{@indicator}-%{}" } }

 

This gave me @indicator field with single CIDR format .  but I Think it wont convert range of IP into many CIDR on that range.

 

for example :  I think this trimming will give only  @indicator = 192.52.43.112 in the place of @indicator = 196.52.43.112-196.52.43.255.

 

I will try your python script alteration as well.  that makes more sense though complex.

 

here is my final Logstash config that trimmed the IP range into single IP CIDR.

 

input {
tcp {
port => 5514
}
}
filter {
# Tag minemeld events
if "@origin" in [message] {
mutate {
add_tag => "minemeld"
}
json {
source => "message"
}
}
#dissecting IPV4 into CIDR format
if "IPv4" in [type] {
dissect {
mapping => { "@indicator" => "%{@indicator}-%{}" }
}
}
}
output {
elasticsearch {
hosts => "http://192.168.56.10:9200"
index => "logstash-threatintel-%{+YYYY.MM.dd}"
}
}

L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Nice one! Yeah, I've been looking into this further today. I found that I don't really need the CIDR format as I can pull out the two IP addresses within the indicator and checking to see if an IP is within its range.

 

I have the following configured for minemeld

 

input {
  tcp {
    port => "1516"
    tags => [ "syslog" ]
  }
}

filter {

  if "@origin" in [message] {
    mutate {
      add_tag => "minemeld"
    }
    json {
      source => "message"
    }

    fingerprint {
        source => "@indicator"
        target => "[@metadata][fingerprint]"
        method => "MURMUR3"
    }

    dissect {
        mapping => { "@indicator" => "%{firstIP}-%{lastIP}" }
    }
  }
}

output {

  if "minemeld" in [tags] and [message] == "withdraw" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "minemeld"
      action => "delete"
      document_id => "%{[@metadata][fingerprint]}"
    }
  } else if "minemeld" in [tags] {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "minemeld"
      document_id => "%{[@metadata][fingerprint]}"
    }
  }

}

This will allow the list to stay updated within Elasticsearch with any "withdraw" events being deleted.

 

I'm then able to query this data from other inputs such as firewalls as below

 

filter {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "minemeld"
        query_template => "/usr/share/logstash/search-minemeld-src.json"
        fields => {
            "sources" => "minemeld_source"
            "confidence" => "minemeld_confidence"
        }
        add_tag => [ "minemeld_trigger_src" ]
    }
}

I then have the following within /usr/share/logstash/search-minemeld-src.json

 

{
  "size": 1,
  "query": {
    "bool": {
      "filter": [
        { "range": { "firstIP": { "lte": "%{[src_ip]}" }}} ,
        { "range": { "lastIP": { "gte": "%{[src_ip]}" }}}
      ]
    }
  },
  "_source": ["sources", "confidence"]
}

So for events that hit this filter (firewall traffic), the "src_ip" is checked against the minemeld index and checks if the IP address exists within each range. This then adds new fields to the firewall traffic (sources/confidence) and adds a new tag (minemeld_trigger_src). Seems to work the way I'm wanting it to, not sure if its of any use to yourself or anyone else. May give you some additional insight however.

 

Cheers,

Kev

Highlighted
L1 Bithead

Re: stdlib.localLogStash - How to get mined data into logstash

Hello @KevinAS   Great ! This is exactly my use case.  I have some cl;arification with below method. 

what is /usr/share/logstash/search-minemeld-src.json.  should i create a search-minemeld-src.json with below content . I went through documenattion and understood some basics. but  your explanation would be useful

 

 

Also  below filter.  Will this give the range or just take the indicator value as first IP - last Ip [only two IPs]

 

dissect {
        mapping => { "@indicator" => "%{firstIP}-%{lastIP}" }

 

Thanks,

Haran

Like what you see?

Show your appreciation!

Click Like if a post is helpful to you or if you just want to show your support.

Click Accept as Solution to acknowledge that the answer to your question has been provided.

The button appears next to the replies on topics you’ve started. The member who gave the solution and all future visitors to this topic will appreciate it!

These simple actions take just seconds of your time, but go a long way in showing appreciation for community members and the Live Community as a whole!

The Live Community thanks you for your participation!