10 minutes to CLX

This is a short introduction to CLX geared mainly towards new users of the code.

What are these libraries?

CLX (Cyber Log Accelerators) provides a simple API for security analysts, data scientists, and engineers to quickly get started applying RAPIDS to real-world cyber use cases. CLX uses the GPU dataframe (cuDF) and other RAPIDS packages to execute cybersecurity and information security workflows. The following packages are available:

  • analytics - Machine learning and statistics functionality

  • ip - IPv4 data translation and parsing

  • parsers - Cyber log Event parsing

  • io - Input and output features for a workflow

  • workflow - Workflow which receives input data and produces analytical output data

  • osi - Open source integration (VirusTotal, FarsightDB and Whois)

  • dns - TLD extraction

When to use CLX

Use CLX to build your cyber data analytics workflows for a GPU-accelerated environmetn using RAPIDS. CLX contains common cyber and cyber ML functionality, such as log parsing for specific data sources, cyber data type parsing (e.g., IPv4), and DGA detection. CLX also provides the ability to integrate this functionality into a CLX workflow, which simplifies execution of the series of parsing and ML functions needed for end-to-end use cases.

Log Parsing

CLX provides traditional parsers for some common log types. Here’s an example parsing a common Windows Event Log of event code type 4770.

[1]:
import cudf
from clx.parsers.windows_event_parser import WindowsEventParser
event = "04/03/2019 11:58:59 AM\\nLogName=Security\\nSourceName=Microsoft Windows security auditing.\\nEventCode=5156\\nEventType=0\\nType=Information\\nComputerName=user234.test.com\\nTaskCategory=Filtering Platform Connection\\nOpCode=Info\\nRecordNumber=241754521\\nKeywords=Audit Success\\nMessage=The Windows Filtering Platform has permitted a connection.\\r\\n\\r\\nApplication Information:\\r\\n\\tProcess ID:\\t\\t4\\r\\n\\tApplication Name:\\tSystem\\r\\n\\r\\nNetwork Information:\\r\\n\\tDirection:\\t\\tInbound\\r\\n\\tSource Address:\\t\\t100.20.100.20\\r\\n\\tSource Port:\\t\\t138\\r\\n\\tDestination Address:\\t100.20.100.30\\r\\n\\tDestination Port:\\t\\t138\\r\\n\\tProtocol:\\t\\t17\\r\\n\\r\\nFilter Information:\\r\\n\\tFilter Run-Time ID:\\t0\\r\\n\\tLayer Name:\\t\\tReceive/Accept\\r\\n\\tLayer Run-Time ID:\\t44"
wep = WindowsEventParser()
df = cudf.DataFrame()
df['raw'] = [event]
result_df = wep.parse(df, 'raw')
result_df.head()
[1]:
changed_attributes_allowedtodelegateto attributes_user_principal_name network_information_source_address additional_information_ticket_options attributes_allowed_to_delegate_to changed_attributes_logon_hours changed_attributes_account_expires account_information_security_id process_information_caller_process_id target_account_account_name ... detailed_authentication_information_transited_services account_whose_credentials_were_used_account_name attributes_user_parameters changed_attributes_profile_path account_information_account_domain additional_information_transited_services account_for_which_logon_failed_account_domain new_account_domain_name failure_information_sub_status service_service_name
0 100.20.100.20 ...

1 rows × 131 columns

Cyber Data Types

CLX provides the ability to work with different data types that are specific to cybersecurity, such as IPv4 and DNS. Here’s an example of how to get started.

IPv4

The IPv4 data type is still commonly used and present in log files. Below we demonstrate functionality. Additional operations are available in the clx.ip module.

Convert IPv4 values to integers

[2]:
import clx.ip
import cudf
df = cudf.Series(["5.79.97.178", "94.130.74.45"])
result_df = clx.ip.ip_to_int(df)
print(result_df)
0      89088434
1    1585596973
dtype: int64

Check if IPv4 values are multicast

[3]:
import clx.ip
import cudf
df = cudf.Series(["224.0.0.0", "239.255.255.255", "5.79.97.178"])
result_df = clx.ip.is_multicast(df)
print(result_df)
0     True
1     True
2    False
dtype: bool

TLD Extraction

CLX provides the ability to extract the TLD from the registered domain and subdomains of a URL, using the public suffix list.

[4]:
import cudf
from clx.dns import dns_extractor as dns

input_df = cudf.DataFrame(
    {
        "url": [
            "http://www.google.com",
            "gmail.com",
            "github.com",
            "https://pandas.pydata.org",
            "http://www.worldbank.org.kg/",
            "waiterrant.blogspot.com",
            "http://forums.news.cnn.com.ac/",
            "http://forums.news.cnn.ac/",
            "ftp://b.cnn.com/",
            "a.news.uk",
            "a.news.co.uk",
            "https://a.news.co.uk",
            "107-193-100-2.lightspeed.cicril.sbcglobal.net",
            "a23-44-13-2.deploy.static.akamaitechnologies.com",
        ]
    }
)
output_df = dns.parse_url(input_df["url"])
output_df.head(14)
/opt/conda/envs/clx_dev/lib/python3.8/site-packages/cudf/core/column/string.py:3538: FutureWarning: The expand parameter is deprecated and will be removed in a future version. Set expand=False to match future behavior.
  warnings.warn(
[4]:
hostname subdomain domain suffix
0 www.google.com www google com
1 gmail.com gmail com
2 github.com github com
3 pandas.pydata.org pandas pydata org
4 www.worldbank.org.kg www worldbank org.kg
5 waiterrant.blogspot.com waiterrant blogspot.com
6 forums.news.cnn.com.ac forums.news cnn com.ac
7 forums.news.cnn.ac forums.news cnn ac
8 b.cnn.com b cnn com
9 a.news.uk a news uk
10 a.news.co.uk a news co.uk
11 a.news.co.uk a news co.uk
12 107-193-100-2.lightspeed.cicril.sbcglobal.net 107-193-100-2.lightspeed.cicril sbcglobal net
13 a23-44-13-2.deploy.static.akamaitechnologies.com a23-44-13-2.deploy.static akamaitechnologies com

Machine Learning

CLX offers machine learning and statistcs functions that are ready to integrate into your CLX workflow.

Calculate a rolling z-score on a given cuDF series.

[5]:
import clx.analytics.stats
import cudf
sequence = [3,4,5,6,1,10,34,2,1,11,45,34,2,9,19,43,24,13,23,10,98,84,10]
series = cudf.Series(sequence)
zscores_df = cudf.DataFrame()
zscores_df['zscore'] = clx.analytics.stats.rzscore(series, 7)
print(zscores_df)
          zscore
0           <NA>
1           <NA>
2           <NA>
3           <NA>
4           <NA>
5           <NA>
6    2.374423424
7   -0.645941275
8   -0.683973734
9    0.158832461
10   1.847751909
11   0.880026019
12  -0.950835449
13  -0.360593742
14   0.111407599
15   1.228914145
16  -0.074966331
17  -0.570321249
18   0.327849973
19  -0.934372308
20   2.296828498
21   1.282966989
22  -0.795223674

Workflows

Now that we’ve demonstrated the basics of CLX , let’s try to tie some of this functionality into a CLX workflow. A workflow is defined as a function that receives a cuDF dataframe, performs some operations on it, and then returns an output cuDF dataframe. In our use case, we decide to show how to parse raw WinEVT data within a workflow.

[6]:
import cudf
from clx.workflow.workflow import Workflow
from clx.parsers.windows_event_parser import WindowsEventParser

wep = WindowsEventParser()

class LogParseWorkflow(Workflow):
    def workflow(self, dataframe):
        output = wep.parse(dataframe, "raw")
        return output

input_df = cudf.DataFrame()
input_df["raw"] = ["04/03/2019 11:58:59 AM\\nLogName=Security\\nSourceName=Microsoft Windows security auditing.\\nEventCode=5156\\nEventType=0\\nType=Information\\nComputerName=user234.test.com\\nTaskCategory=Filtering Platform Connection\\nOpCode=Info\\nRecordNumber=241754521\\nKeywords=Audit Success\\nMessage=The Windows Filtering Platform has permitted a connection.\\r\\n\\r\\nApplication Information:\\r\\n\\tProcess ID:\\t\\t4\\r\\n\\tApplication Name:\\tSystem\\r\\n\\r\\nNetwork Information:\\r\\n\\tDirection:\\t\\tInbound\\r\\n\\tSource Address:\\t\\t100.20.100.20\\r\\n\\tSource Port:\\t\\t138\\r\\n\\tDestination Address:\\t100.20.100.30\\r\\n\\tDestination Port:\\t\\t138\\r\\n\\tProtocol:\\t\\t17\\r\\n\\r\\nFilter Information:\\r\\n\\tFilter Run-Time ID:\\t0\\r\\n\\tLayer Name:\\t\\tReceive/Accept\\r\\n\\tLayer Run-Time ID:\\t44"]
lpw = LogParseWorkflow(name="my-log-parsing-workflow")
lpw.workflow(input_df)
[6]:
changed_attributes_allowedtodelegateto attributes_user_principal_name network_information_source_address additional_information_ticket_options attributes_allowed_to_delegate_to changed_attributes_logon_hours changed_attributes_account_expires account_information_security_id process_information_caller_process_id target_account_account_name ... detailed_authentication_information_transited_services account_whose_credentials_were_used_account_name attributes_user_parameters changed_attributes_profile_path account_information_account_domain additional_information_transited_services account_for_which_logon_failed_account_domain new_account_domain_name failure_information_sub_status service_service_name
0 100.20.100.20 ...

1 rows × 131 columns

A workflow can receive and output data from different locations, including CSV files and Kafka. To integrate I/O into your workflow, simply indicate your workflow configurations within a workflow.yaml file or define your configurations at instantiation within a python dictionary.
The workflow class will first look for any configuration file here:
  • /etc/clx/[workflow-name]/workflow.yaml then

  • ~/.config/clx/[workflow-name]/workflow.yaml

To learn more about workflow configurations visit the CLX Workflow page

To demonstrate the input functionality, we’ll create a small CSV input file.

[7]:
import cudf
input_df = cudf.DataFrame()
input_df["raw"] = ["04/03/2019 11:58:59 AM\\nLogName=Security\\nSourceName=Microsoft Windows security auditing.\\nEventCode=5156\\nEventType=0\\nType=Information\\nComputerName=user234.test.com\\nTaskCategory=Filtering Platform Connection\\nOpCode=Info\\nRecordNumber=241754521\\nKeywords=Audit Success\\nMessage=The Windows Filtering Platform has permitted a connection.\\r\\n\\r\\nApplication Information:\\r\\n\\tProcess ID:\\t\\t4\\r\\n\\tApplication Name:\\tSystem\\r\\n\\r\\nNetwork Information:\\r\\n\\tDirection:\\t\\tInbound\\r\\n\\tSource Address:\\t\\t100.20.100.20\\r\\n\\tSource Port:\\t\\t138\\r\\n\\tDestination Address:\\t100.20.100.30\\r\\n\\tDestination Port:\\t\\t138\\r\\n\\tProtocol:\\t\\t17\\r\\n\\r\\nFilter Information:\\r\\n\\tFilter Run-Time ID:\\t0\\r\\n\\tLayer Name:\\t\\tReceive/Accept\\r\\n\\tLayer Run-Time ID:\\t44"]
input_df.to_csv("alert_data.csv")

Next, create and run the workflow.

[8]:
from clx.workflow.workflow import Workflow
from clx.parsers.windows_event_parser import WindowsEventParser
import os

dirpath = os.getcwd()

source = {
   "type": "fs",
   "input_format": "csv",
   "input_path": os.path.join(dirpath, "alert_data.csv"),
   "schema": ["raw"],
   "delimiter": ",",
   "required_cols": ["raw"],
   "dtype": ["str"],
   "header": 0
}
destination = {
   "type": "fs",
   "output_format": "csv",
   "output_path": os.path.join(dirpath, "alert_data_output.csv")
}
wep = WindowsEventParser()

class LogParseWorkflow(Workflow):
    def workflow(self, dataframe):
        output = wep.parse(dataframe, "raw")
        return output

lpw = LogParseWorkflow(source=source, destination=destination, name="my-log-parsing-workflow")
lpw.run_workflow()

Output data can be read directly from the resulting CSV file.

[9]:
f = open('alert_data_output.csv', "r")
f.readlines()
[9]:
[',changed_attributes_allowedtodelegateto,attributes_user_principal_name,network_information_source_address,additional_information_ticket_options,attributes_allowed_to_delegate_to,changed_attributes_logon_hours,changed_attributes_account_expires,account_information_security_id,process_information_caller_process_id,target_account_account_name,account_whose_credentials_were_used_account_domain,target_account_old_account_name,attributes_logon_hours,new_logon_account_name,network_information_network_address,process_process_id,network_information_client_address,user_account_domain,account_information_logon_guid,network_information_client_port,group_security_id,subject_security_id,member_account_name,member_security_id,changed_attributes_sid_history,new_logon_security_id,additional_information_expiration_time,changed_attributes_user_principal_name,network_information_direction,network_information_protocol,attributes_sam_account_name,target_server_additional_information,target_account_security_id,changed_attributes_sam_account_name,attributes_password_last_set,subject_logon_guid,filter_information_filter_run_time_id,service_information_service_id,process_information_process_name,changed_attributes_primary_group_id,filter_information_layer_run_time_id,account_for_which_logon_failed_account_name,network_information_workstation_name,attributes_user_account_control,attributes_primary_group_id,detailed_authentication_information_logon_process,additional_information_pre_authentication_type,changed_attributes_old_uac_value,attributes_new_uac_value,network_information_source_network_address,changed_attributes_user_workstations,privileges,attributes_user_workstations,eventcode,account_locked_out_security_id,service_server,group_group_domain,attributes_sid_history,attributes_profile_path,attributes_home_directory,certificate_information_certificate_issuer_name,subject_account_name,changed_attributes_user_parameters,changed_attributes_display_name,id,failure_information_failure_reason,account_information_user_id,new_logon_logon_guid,account_for_which_logon_failed_security_id,attributes_display_name,network_information_destination_address,time,subject_account_domain,network_information_port,network_information_source_port,changed_attributes_user_account_control,account_information_supplied_realm_name,changed_attributes_script_path,attributes_old_uac_value,additional_information_ticket_encryption_type,attributes_account_expires,changed_attributes_home_directory,user_security_id,new_account_security_id,target_account_account_domain,target_account_new_account_name,attributes_home_drive,service_information_service_name,additional_information_caller_computer_name,attributes_script_path,user_account_name,logon_type,certificate_information_certificate_thumbprint,network_information_destination_port,changed_attributes_new_uac_value,additional_information_failure_code,group_group_name,process_information_caller_process_name,changed_attributes_home_drive,computername,application_information_application_name,detailed_authentication_information_authentication_package,account_information_account_name,account_locked_out_account_name,failure_information_status,changed_attributes_password_last_set,detailed_authentication_information_key_length,filter_information_layer_name,additional_information_privileges,process_process_name,new_logon_account_domain,account_whose_credentials_were_used_logon_guid,additional_information_result_code,subject_logon_id,new_account_account_name,detailed_authentication_information_package_name_ntlm_only,application_information_process_id,target_server_target_server_name,certificate_information_certificate_serial_number,process_information_process_id,new_logon_logon_id,detailed_authentication_information_transited_services,account_whose_credentials_were_used_account_name,attributes_user_parameters,changed_attributes_profile_path,account_information_account_domain,additional_information_transited_services,account_for_which_logon_failed_account_domain,new_account_domain_name,failure_information_sub_status,service_service_name\n',
 '0,,,100.20.100.20,,,,,,,,,,,,,,,,,,,,,,,,,,inbound,17,,,,,,,0,,,,44,,,,,,,,,,,,,5156,,,,,,,,,,,,,,,,,100.20.100.30,,,,138,,,,,,,,,,,,,,,,,,,138,,,,,,,system,,,,,,,receive/accept,,,,,,,,,4,,,,,,,,,,,,,,\n']

Open Source Threat Intelligence Integration

Often it’s beneficial to integrate open source threat intelligence with collected data. CLX includes the ability to query VirusTotal and FarsightDB directly. An API key is necessary for both of these integrations.

[ ]:
from clx.osi.virus_total import VirusTotalClient
vt_api_key='<virus total apikey goes here>'
vt_client = VirusTotalClient(api_key=vt_api_key)
result = vt_client.url_scan(["virustotal.com"])
[ ]:
from clx.osi.farsight import FarsightLookupClient
server='https://api.dnsdb.info'
fs_api_key='<farsight apikey goes here>'
fs_client = FarsightLookupClient(server, fs_api_key, limit=1)
result = fs_client.query_rrset("www.dnsdb.info")
[12]:
from clx.osi.whois import WhoIsLookupClient
whois_client = WhoIsLookupClient()
whois_result = whois_client.whois(["nvidia.com"])
print(whois_result)
[{'domain_name': 'NVIDIA.COM', 'registrar': 'Safenames Ltd', 'whois_server': 'whois.safenames.net', 'referral_url': None, 'updated_date': '05-01-2022 02:42:29,05-01-2022 03:42:30', 'creation_date': '04-20-1993 04:00:00', 'expiration_date': '04-21-2024 04:00:00', 'name_servers': 'DNS1.P09.NSONE.NET,DNS2.P09.NSONE.NET,NS5.DNSMADEEASY.COM,NS6.DNSMADEEASY.COM,NS7.DNSMADEEASY.COM,dns1.p09.nsone.net,dns2.p09.nsone.net,ns5.dnsmadeeasy.com,ns6.dnsmadeeasy.com,ns7.dnsmadeeasy.com', 'status': 'clientDeleteProhibited https://icann.org/epp#clientDeleteProhibited,clientTransferProhibited https://icann.org/epp#clientTransferProhibited,serverDeleteProhibited https://icann.org/epp#serverDeleteProhibited,serverTransferProhibited https://icann.org/epp#serverTransferProhibited,serverUpdateProhibited https://icann.org/epp#serverUpdateProhibited', 'emails': 'abuse@safenames.net,wadmpfvzi5ei@idp.email,hostmaster@safenames.net', 'dnssec': 'unsigned', 'name': 'Data protected, not disclosed', 'org': None, 'address': '2788 San Tomas Expressway', 'city': 'Santa Clara', 'state': 'CA', 'zipcode': '95051', 'country': 'US'}]