stdlib.localLogStash - How to get mined data into logstash

cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

stdlib.localLogStash - How to get mined data into logstash

L1 Bithead

Hello Team,

 

I have set up few IP miners and then processing through a inbound aggregator and then giving it to Logstash output,

Names have been customised as I have cloned the aggregator and output from existing prototypes.

image

I am not seeing any indicators in DARP-output [stdlib.localLogStash] . is it normal ? and can some one help me with logstash configuration for getting these indicators into a file.

i tried this logstash config  but no success. dont know what i am missing.

 

#logstash configuration:
input {
tcp {
port => 5514
}
}

output {
file {
path =>/etc/logstash/minemeld-output.txt
}
}

 

Immight be missing something from minemeld or logstash end.  kind help is much appreciated.

1 accepted solution

Accepted Solutions

L1 Bithead

Hey,

 

I've yet to dig deeper but I've managed to get this working and feed into Elasticsearch via Logstash.

 

Everything you've got setup looks fine however one thing I found is that as soon as the output is set up, it will send the bulk of the output therefore if you've not yet set up the logstash listener, then you will miss the events come into it.

 

I had to remove and re-add the output so I could pick up the indicators.

 

Once added, I was able to see the data however everything was in the 'message' field in a JSON format so added in a filter within logstash as below:

 

filter {

  # Tag minemeld events
  if "@origin" in [message] {
    mutate {
      add_tag => "minemeld"
    }
    json {
      source => "message"
    }
  }

} 

 

Let us know if you get this working as I can't see much info on this either.

 

Cheers,

Kev

View solution in original post

11 REPLIES 11

L1 Bithead

Hello Community - Can someone help me with my above query. 

what is the best way used for getting minemeld data into elasticsearch.  ? 


L1 Bithead

Hey,

 

I've yet to dig deeper but I've managed to get this working and feed into Elasticsearch via Logstash.

 

Everything you've got setup looks fine however one thing I found is that as soon as the output is set up, it will send the bulk of the output therefore if you've not yet set up the logstash listener, then you will miss the events come into it.

 

I had to remove and re-add the output so I could pick up the indicators.

 

Once added, I was able to see the data however everything was in the 'message' field in a JSON format so added in a filter within logstash as below:

 

filter {

  # Tag minemeld events
  if "@origin" in [message] {
    mutate {
      add_tag => "minemeld"
    }
    json {
      source => "message"
    }
  }

} 

 

Let us know if you get this working as I can't see much info on this either.

 

Cheers,

Kev

Thanks kev.   I deleted logstash output in minemeld and reconfigured. 

 

But I get Unrecognized SSL message error while running logstash.  is it because that I configured self signed cert in minemeld server ??

can you help me fixing this. 

You are awesom @KevinAS    - Thank you.  I am successfully getting minemeld IOC into elasticsearch. 

 

can you help me with one more query.  Currently the indicators are getting parsed as range types.  how can we get it as a single IP range.   Is it something we can configure in minemeld  or is there a way to trim the indicator field in logstash ? 

 

your help would be really useful. 

 

ip range.JPG

Hey,

Glad to hear you got it working!

I'm working on the same problem. Ideally I'm looking to get this in a CIDR format but unsure on where to configure this whether it be on the processor stage or the logstash output. Also I'm trying to achieve this at the minemeld stage instead of using logstash filters.

If anyone else can help on this part, would be helping us both out.

In the meantime, I'll keep looking.

Cheers,
Kev

Previous post got removed, not sure where it went.

 

Managed to get this working after some additional lines in one of the files. Not the greatest fix but it does the job for now.

 

Had to edit the following logstash.py file (path dependent on install location)

/opt/minemeld/engine/core/minemeld/ft/logstash.py

Need to add an import into the file

 

Before:

import logging
import ujson
import datetime
import socket

After:

import logging
import ujson
import datetime
import socket
import netaddr

Then some additional lines under the indicators section

 

Before:

if indicator is not None:
    fields['@indicator'] = indicator

After:

if indicator is not None:
    fields['@indicator'] = indicator
    if len(indicator.split('-')) == 2:
        startIP = indicator.split('-')[0]
        endIP = indicator.split('-')[1]
        cidr = netaddr.iprange_to_cidrs(startIP, endIP)[0]
        fields['CIDR'] = str(cidr)

This will produce a new field called 'CIDR'.

 

If anyone knows an alternative/cleaner method to achieve this, please share!

 

Hope this helps.

 

Cheers,

Kev

Thank you so much @KevinAS .   I Tried trimming the format in @indicator field.  

 

I used Dissect mapping from logstash.  [ Thanks to logstash community for suggestions]

 

dissect { mapping => { "@indicator" => "%{@indicator}-%{}" } }

 

This gave me @indicator field with single CIDR format .  but I Think it wont convert range of IP into many CIDR on that range.

 

for example :  I think this trimming will give only  @indicator = 192.52.43.112 in the place of @indicator = 196.52.43.112-196.52.43.255.

 

I will try your python script alteration as well.  that makes more sense though complex.

 

here is my final Logstash config that trimmed the IP range into single IP CIDR.

 

input {
tcp {
port => 5514
}
}
filter {
# Tag minemeld events
if "@origin" in [message] {
mutate {
add_tag => "minemeld"
}
json {
source => "message"
}
}
#dissecting IPV4 into CIDR format
if "IPv4" in [type] {
dissect {
mapping => { "@indicator" => "%{@indicator}-%{}" }
}
}
}
output {
elasticsearch {
hosts => "http://192.168.56.10:9200"
index => "logstash-threatintel-%{+YYYY.MM.dd}"
}
}

Nice one! Yeah, I've been looking into this further today. I found that I don't really need the CIDR format as I can pull out the two IP addresses within the indicator and checking to see if an IP is within its range.

 

I have the following configured for minemeld

 

input {
  tcp {
    port => "1516"
    tags => [ "syslog" ]
  }
}

filter {

  if "@origin" in [message] {
    mutate {
      add_tag => "minemeld"
    }
    json {
      source => "message"
    }

    fingerprint {
        source => "@indicator"
        target => "[@metadata][fingerprint]"
        method => "MURMUR3"
    }

    dissect {
        mapping => { "@indicator" => "%{firstIP}-%{lastIP}" }
    }
  }
}

output {

  if "minemeld" in [tags] and [message] == "withdraw" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "minemeld"
      action => "delete"
      document_id => "%{[@metadata][fingerprint]}"
    }
  } else if "minemeld" in [tags] {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "minemeld"
      document_id => "%{[@metadata][fingerprint]}"
    }
  }

}

This will allow the list to stay updated within Elasticsearch with any "withdraw" events being deleted.

 

I'm then able to query this data from other inputs such as firewalls as below

 

filter {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "minemeld"
        query_template => "/usr/share/logstash/search-minemeld-src.json"
        fields => {
            "sources" => "minemeld_source"
            "confidence" => "minemeld_confidence"
        }
        add_tag => [ "minemeld_trigger_src" ]
    }
}

I then have the following within /usr/share/logstash/search-minemeld-src.json

 

{
  "size": 1,
  "query": {
    "bool": {
      "filter": [
        { "range": { "firstIP": { "lte": "%{[src_ip]}" }}} ,
        { "range": { "lastIP": { "gte": "%{[src_ip]}" }}}
      ]
    }
  },
  "_source": ["sources", "confidence"]
}

So for events that hit this filter (firewall traffic), the "src_ip" is checked against the minemeld index and checks if the IP address exists within each range. This then adds new fields to the firewall traffic (sources/confidence) and adds a new tag (minemeld_trigger_src). Seems to work the way I'm wanting it to, not sure if its of any use to yourself or anyone else. May give you some additional insight however.

 

Cheers,

Kev

Hello @KevinAS   Great ! This is exactly my use case.  I have some cl;arification with below method. 

what is /usr/share/logstash/search-minemeld-src.json.  should i create a search-minemeld-src.json with below content . I went through documenattion and understood some basics. but  your explanation would be useful

 

 

Also  below filter.  Will this give the range or just take the indicator value as first IP - last Ip [only two IPs]

 

dissect {
        mapping => { "@indicator" => "%{firstIP}-%{lastIP}" }

 

Thanks,

Haran

Hey,

 

Yeah, you'll need to create the file /usr/share/logstash/search-minemeld-src.json with the content mentioned in my previous reply.

My understanding of this was the "src_ip" would be checked against the "firstIP" and "lastIP" fields within the minemeld index. If the "src_ip" is lower than or equal to the "firstIP" AND  if the "src_ip" is greater than or equal to the "lastIP", it would take its "sources" and "confidence" values (from the event in the minemeld index) and copy these to the event that its matched.

 

The dissect section essentially is splitting up the indicator into two separate fields (firstIP and lastIP).

 

With the above method, we don't need to know the CIDR address.

 

Hope this helps.

Great @KevinAS   . Thanks again. 

 

I am documenting the Minemeld integration and use cases with elastic search . Thinking to take it with Community via either blog post or Github.   You deserve a big credit for the Logstash configuration. 

 

Let me know your interst and email ID.   would like to connect with you and add you for credits for the github/documentation. 

 

I am available @  harankumar92@gmail.com

 

Regards,

Haran

  • 1 accepted solution
  • 10008 Views
  • 11 replies
  • 0 Likes
Like what you see?

Show your appreciation!

Click Like if a post is helpful to you or if you just want to show your support.

Click Accept as Solution to acknowledge that the answer to your question has been provided.

The button appears next to the replies on topics you’ve started. The member who gave the solution and all future visitors to this topic will appreciate it!

These simple actions take just seconds of your time, but go a long way in showing appreciation for community members and the LIVEcommunity as a whole!

The LIVEcommunity thanks you for your participation!