This notebook is a quick workflow to use with ELK and browse indexed threats. The goal is to provide a ready to use workflow to identify particular threat, statistics and do an initial threat analysis.
The list of the below libraries are required to run this notebook. You may need to install them with pip.
import requests
import magic
import json
import matplotlib
import pandas as pd
import datetime
import requests
import pefile
import peutils
import matplotlib.pyplot as plt
import os
from elasticsearch import Elasticsearch
from ssl import create_default_context
The first thing to do is to check if the connection to your Elastic instance is working.
# Test Connection
#res = requests.get('')
#json_res = json.loads(res.content)
#print(json.dumps(json_res, sort_keys=True, indent=4))
#context = create_default_context(cafile='/Users/fr0gger/Documents/malware_notebook/elasticsearch-ca.pem')
# ES Variable
#es = Elasticsearch([{'host': '192.168.1.10', 'port': 9200}], scheme="http", port=9200)
es = Elasticsearch("http://192.168.1.x:9200")
#Elasticsearch
es.indices.get_alias()
In this section we will explore the data to understand what kind of information are stored. To do that we need to extract the list of index stored then choose the one we want to analyze.
# Get list of index
es.indices.get_alias()
es.indices.get_alias().keys()
es.indices.get_mapping().keys()
In this section you need to select the dataset you want to explore.
# Select the index to explore -
res = es.search(index='malbaz2-2022-09-28',body={'query': {"match_all": {}}})
#print(json.dumps(res['hits']['hits'], indent=4))
print(json.dumps(res["hits"]['hits'][0], indent=4))
Request to sort on specific data.
# Extract data containing the name of the malware from the Tags field
#es.search(index="malbaz2-2022-09-28", body={"query": {"match": {'signature':'AgentTesla'}}})
In this section we export all the data from the specified index to convert them in a dataframe.
# make an API call to the Elasticsearch cluster to get documents
result = es.search(index='malbaz2-2022-09-28', body={}, size=1)
result
elastic_docs = result["hits"]["hits"]
Converting the ELK Data into a Pandas Dataframe.
# create an empty Pandas DataFrame object for Elasticsearch docs
docs = pd.DataFrame()
# iterate each Elasticsearch doc in list
for num, doc in enumerate(elastic_docs):
# get _source data dict from document
source_data = doc["_source"]
# get _id from document
_id = doc["_id"]
# create a Series object from doc dict object
doc_data = pd.Series(source_data, name = _id)
# append the Series object to the DataFrame object
docs = docs.append(doc_data)
data_csv = docs.to_csv()
ndoc = docs.drop(['@version', 'message', 'host', '@timestamp'], axis=1)
ndoc
ndoc.describe()
ndoc1 = ndoc.drop(["column19", "column24", "column20", "column27", "column21", "column22", "column18", "column23", "column25", "column26", "column16", "column15", "column17"], axis=1)
ndoc1.columns.tolist()
ndoc1['signature'].value_counts().head(10)
In this section we will explore the dataset created and generate graph.
ax = ndoc['signature'].value_counts().plot(kind='barh',figsize=(40,38),
color=['b', 'g', 'r', 'y', 'm', 'c'], fontsize=13);
ax.set_alpha(1)
ax.set_title("Malware family", fontsize=20)
ax.set_xticks([0, 50, 100, 500, 1000, 2000, 3000])
# create a list to collect the plt.patches data
totals = []
# find the values and append to list
for i in ax.patches:
totals.append(i.get_width())
# set individual bar lables using above list
total = sum(totals)
# set individual bar lables using above list
for i in ax.patches:
#print(i.get_width())
ax.text(i.get_width()+.3, i.get_y()+.38, i.get_width(),fontsize=10, color='dimgrey')
# invert for largest on top
ax.invert_yaxis()
ndoc2 = ndoc1[ndoc1["signature"] == "\" RemcosRAT\""]
ndoc2
ndoc2['first_seen_utc'] = pd.to_datetime(ndoc2['first_seen_utc']).dt.date
Generate a Timeline of the samples selected.
ndoc3 = ndoc2.drop(["sha256_hash","md5_hash","sha1_hash","reporter","file_name","file_type_guess","mime_type","clamav","vtpercent","imphash","ssdeep", "tlss", "signature"], axis=1)
#ndoc2
ndoc3.groupby(ndoc2["first_seen_utc"]).count().plot(kind="bar", title='Timeline of Samples', figsize=(15,10),color=['b'], fontsize=13)
# Filtering by a given date
date_sample = datetime.datetime.strptime("2022-08-31", "%Y-%m-%d")
ndoc2[ndoc2["first_seen_utc"] == date_sample.date()] # use < > for a period of time
This part will download the sample from VT and provide you insight about the malware. For running this part you need a VT Intelligence API.
# Imports and configuration
from IPython.display import display, HTML
from msticpy.sectools import IoCExtract
import matplotlib.pyplot as plt
import sys
import warnings
from msticpy import init_notebook
init_notebook(namespace=globals());
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 100)
# Loading the VT API key
from msticpy.common.provider_settings import get_provider_settings
from msticpy.sectools.vtlookupv3 import VTLookupV3, VTEntityType
import nest_asyncio
vt_key = get_provider_settings("TIProviders")["VirusTotal"].args["AuthKey"]
# Instantiate vt_lookup object
vt_lookup = VTLookupV3(vt_key)
nest_asyncio.apply()
hash_details = vt_lookup.get_object("c2ed3ba6d8fd0a0cee0d447133c79886", "file")
hash_details
contacted_domain = vt_lookup.lookup_ioc_relationships(observable = "c2ed3ba6d8fd0a0cee0d447133c79886", vt_type = 'file', relationship = 'contacted_domains')
contacted_domain
# Instantiate vt_lookup object
DOMAIN = "ifeanyiogbunebe.ddns.net"
ip_relation = vt_lookup.lookup_ioc_relationships(observable = DOMAIN, vt_type = 'domain', relationship = 'communicating_files')
ip_relation
import yara
rules = yara.compile("RAT2.yar")
def yarascan(filename, rules):
try:
matches = rules.match(filename)
except ValueError as e:
print(e)
if matches:
return matches
return
yarascan(filename, rules)
This short notebook shows how to explore an index in ELK and retrieve the data. It then demonstrates how to generate some graphs to visualize the data and finally how to use MSTICpy to pivot of some information and uncover additional threats.