API Reference
IP
- clx.ip.hostmask(ips, prefixlen=16)
Compute a column of hostmasks for a column of IP addresses. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- prefixlen: integer
Length of the network prefix, in bits, for IPv4 addresses
- Returns
- rtypecudf.Series, hostmask
Hostmask ouput from set of IP address
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.hostmask(cudf.Series(["192.168.0.1","10.0.0.1"], prefixlen=16) 0 0.0.255.255 1 0.0.255.255 Name: hostmask, dtype: object
- clx.ip.int_to_ip(values)
Convert integer column to IP addresses. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- valuescudf.Series, integer
Integer representations of IP addresses
- Returns
- rtypecudf.Series, IPv4 address
IP addresses to be converted
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.int_to_ip(cudf.Series([3232235521, 167772161])) 0 5.79.97.178 1 94.130.74.45 dtype: object
- clx.ip.ip_to_int(values)
Convert string column of IP addresses to integer values. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- valuescudf.Series, IPv4 address
IP addresses to be converted
- Returns
- rtypecudf.Series, integer
Integer representations of IP addresses
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.ip_to_int(cudf.Series(["192.168.0.1","10.0.0.1"])) 0 89088434 1 1585596973 dtype: int64
- clx.ip.is_global(ips)
Indicates whether each address is global. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_global(cudf.Series(["127.0.0.1","207.46.13.151"])) 0 False 1 True dtype: bool
- clx.ip.is_ip(ips)
Indicates whether each address is an ip string. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_ip(cudf.Series(["192.168.0.1","10.123.0"])) 0 True 1 False dtype: bool
- clx.ip.is_link_local(ips)
Indicates whether each address is link local. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_link_local(cudf.Series(["127.0.0.1","169.254.123.123"])) 0 False 1 True dtype: bool
- clx.ip.is_loopback(ips)
Indicates whether each address is loopback. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_loopback(cudf.Series(["127.0.0.1","10.0.0.1"])) 0 True 1 False dtype: bool
- clx.ip.is_multicast(ips)
Indicates whether each address is multicast. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_multicast(cudf.Series(["127.0.0.1","224.0.0.0"])) 0 False 1 True dtype: bool
- clx.ip.is_private(ips)
Indicates whether each address is private. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_private(cudf.Series(["127.0.0.1","207.46.13.151"])) 0 True 1 False dtype: bool
- clx.ip.is_reserved(ips)
Indicates whether each address is reserved. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_reserved(cudf.Series(["127.0.0.1","10.0.0.1"])) 0 False 1 False dtype: bool
- clx.ip.is_unspecified(ips)
Indicates whether each address is unspecified. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- Returns
- rtypecudf.Series, booleans
Boolean values true or false
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.is_unspecified(cudf.Series(["127.0.0.1","10.0.0.1"])) 0 False 1 False dtype: bool
- clx.ip.mask(ips, masks)
Apply a mask to a column of IP addresses. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- masks: Subnet mask value
The host or subnet masks to be applied
- Returns
- rtypecudf.Series, masked IPv4 address
Masked IP address from list of IPs
- Examples
>>> import clx.ip ..
>>> import cudf ..
>>> input_ips = cudf.Series(["192.168.0.1","10.0.0.1"]) ..
>>> input_masks = cudf.Series(["255.255.0.0", "255.255.0.0"]) ..
>>> clx.ip.mask(input_ips, input_masks) ..
- 0 192.168.0.0
- 1 10.0.0.0
- Name: mask, dtype: object
- clx.ip.netmask(ips, prefixlen=16)
Compute a column of netmasks for a column of IP addresses. Addresses must be IPv4. IPv6 not yet supported.
- Parameters
- ipsIPv4 address
IP addresses to be checked
- prefixlen: integer
Length of the network prefix, in bits, for IPv4 addresses
- Returns
- rtypecudf.Series, netmask
Netmask ouput from set of IP address
Examples
>>> import clx.ip >>> import cudf >>> clx.ip.netmask(cudf.Series(["192.168.0.1","10.0.0.1"]), prefixlen=16) 0 255.255.0.0 1 255.255.0.0 Name: net_mask, dtype: object
Features
- clx.features.binary(dataframe, entity_id, feature_id)
Create binary feature dataframe using provided dataset, entity, and feature.
- Parameters
- Returns
dataframe
- Return type
cudf.DataFrame
Examples
>>> import cudf >>> import clx.features >>> df = cudf.DataFrame( { "time": [1, 2, 3], "user": ["u1", "u2", "u1",], "computer": ["c1", "c1", "c3"], } ) >>> output = clx.features.binary(df, "user", "computer") >>> output c1 c3 user u1 1.0 1.0 u2 1.0 0.0
- clx.features.frequency(dataframe, entity_id, feature_id)
Create frequency feature dataframe using provided dataset, entity, and feature.
- Parameters
- Returns
dataframe
- Return type
cudf.DataFrame
Examples
>>> import cudf >>> import clx.features >>> df = cudf.DataFrame( { "time": [1, 2, 3], "user": ["u1", "u2", "u1",], "computer": ["c1", "c1", "c3"], } ) >>> output = clx.features.binary(df, "user", "computer") >>> output c1 c3 user u1 0.5 0.5 u2 1.0 0.0
Analytics
- class clx.analytics.asset_classification.AssetClassification(layers=[200, 100], drops=[0.001, 0.01], emb_drop=0.04, is_reg=False, is_multi=True, use_bn=True)
Supervised asset classification on tabular data containing categorical and/or continuous features.
- Parameters
layers – linear layer follow the input layer
drops – drop out percentage
emb_drop – drop out percentage at embedding layers
is_reg – is regression
is_multi – is classification
use_bn – use batch normalization
Methods
load_model
(fname)Load a saved model.
predict
(gdf, cat_cols, cont_cols)Predict the class with the trained model
save_model
(fname)Save trained model
train_model
(train_gdf, cat_cols, cont_cols, ...)This function is used for training fastai tabular model with a given training dataset.
- load_model(fname)
Load a saved model.
- Parameters
fname (str) – directory path to model
Examples
>>> from clx.analytics.asset_classification import AssetClassification >>> ac = AssetClassification() >>> ac.load_model("ac.mdl")
- predict(gdf, cat_cols, cont_cols)
Predict the class with the trained model
- Parameters
gdf (cudf.DataFrame) – prediction input dataset with categorized int16 feature columns
cat_cols – array of categorical column names in gdf
cont_col – array of continuous column names in gdf
Examples
>>> cat_cols = ["1", "2", "3", "4", "5", "6", "7", "8", "9"] >>> cont_cols = ["10"] >>> ac.predict(X_test, cat_cols, cont_cols).values_host 0 0 1 0 2 0 3 0 4 2 .. 8204 0 8205 4 8206 0 8207 3 8208 0 Length: 8209, dtype: int64
- save_model(fname)
Save trained model
- Parameters
save_to_path (str) – directory path to save model
Examples
>>> from clx.analytics.asset_classification import AssetClassification >>> ac = AssetClassification() >>> cat_cols = ["1", "2", "3", "4", "5", "6", "7", "8", "9"] >>> cont_cols = ["10"] >>> ac.train_model(X_train, cat_cols, cont_cols, "label", batch_size, epochs, lr=0.01, wd=0.0) >>> ac.save_model("ac.mdl")
- train_model(train_gdf, cat_cols, cont_cols, label_col, batch_size, epochs, lr=0.01, wd=0.0)
This function is used for training fastai tabular model with a given training dataset.
- Parameters
train_gdf (cudf.DataFrame) – training dataset with categorized and/or continuous feature columns
cat_cols – array of categorical column names in train_gdf
cont_col – array of continuous column names in train_gdf
label_col (str) – column name of label column in train_gdf
batch_size (int) – train_gdf will be partitioned into multiple dataframes of this size
epochs (int) – number of epochs to be adjusted depending on convergence for a specific dataset
lr (float) – learning rate
wd (float) – wd
Examples
>>> from clx.analytics.asset_classification import AssetClassification >>> ac = AssetClassification() >>> cat_cols = ["1", "2", "3", "4", "5", "6", "7", "8", "9"] >>> cont_cols = ["10"] >>> ac.train_model(X_train, cat_cols, cont_cols, "label", batch_size, epochs, lr=0.01, wd=0.0)
- class clx.analytics.detector.Detector(lr=0.001)
- Attributes
- criterion
- model
- optimizer
Methods
leverage_model
(model)This function leverages model by setting parallelism parameters.
load_model
(file_path)This function load already saved model and sets cuda parameters.
save_model
(file_path)This function saves model to a given location.
init_model
predict
train_model
- leverage_model(model)
This function leverages model by setting parallelism parameters.
- Parameters
model (RNNClassifier) – Model instance.
- load_model(file_path)
This function load already saved model and sets cuda parameters.
- Parameters
file_path (string) – File path of a model to be loaded.
- save_model(file_path)
This function saves model to a given location.
- Parameters
file_path (string) – File path of a model to be saved.
- class clx.analytics.dga_dataset.DGADataset(df, truncate)
Constructor to create DGADataset instance.
- Parameters
df (cudf.DataFrame) – Input dataframe.
truncate (int) – Truncate string to n number of characters.
- class clx.analytics.dga_detector.DGADetector(lr=0.001)
This class provides multiple functionalities such as build, train and evaluate the RNNClassifier model to distinguish legitimate and DGA domain names.
Methods
evaluate_model
(dataloader)This function evaluates the trained model to verify it's accuracy.
init_model
([char_vocab, hidden_size, ...])This function instantiates RNNClassifier model to train.
load_checkpoint
(file_path)This function load already saved model checkpoint and sets cuda parameters.
predict
(domains[, probability, truncate])This function accepts cudf series of domains as an argument to classify domain names as benign/malicious and returns the learned label for each object in the form of cudf series.
save_checkpoint
(file_path)This function saves model checkpoint to given location.
train_model
(train_data, labels[, ...])This function is used for training RNNClassifier model with a given training dataset.
- evaluate_model(dataloader)
This function evaluates the trained model to verify it’s accuracy.
- Parameters
dataloader (DataLoader) – Instance holds preprocessed data.
- Returns
Model accuracy
- Return type
decimal
Examples
>>> dd = DGADetector() >>> dd.init_model() >>> dd.evaluate_model(dataloader) Evaluating trained model ... Test set accuracy: 3/4 (0.75)
- init_model(char_vocab=128, hidden_size=100, n_domain_type=2, n_layers=3)
This function instantiates RNNClassifier model to train. And also optimizes to scale it and keep running on parallelism.
- load_checkpoint(file_path)
This function load already saved model checkpoint and sets cuda parameters.
- Parameters
file_path (string) – File path of a model checkpoint to be loaded.
- predict(domains, probability=False, truncate=100)
This function accepts cudf series of domains as an argument to classify domain names as benign/malicious and returns the learned label for each object in the form of cudf series.
- Parameters
domains (cudf.Series) – List of domains.
truncate (int) – Truncate string to n number of characters.
- Returns
Predicted results with respect to given domains.
- Return type
cudf.Series
Examples
>>> dd.predict(['nvidia.com', 'dgadomain']) 0 0.010 1 0.924 Name: dga_probability, dtype: decimal
- save_checkpoint(file_path)
This function saves model checkpoint to given location.
- Parameters
file_path (string) – File path to save model checkpoint.
- train_model(train_data, labels, batch_size=1000, epochs=5, train_size=0.7, truncate=100)
This function is used for training RNNClassifier model with a given training dataset. It returns total loss to determine model prediction accuracy.
- Parameters
Examples
>>> from clx.analytics.dga_detector import DGADetector >>> dd = DGADetector() >>> dd.init_model() >>> dd.train_model(train_data, labels) 1.5728906989097595
- class clx.analytics.loda.Loda(n_bins=None, n_random_cuts=100)
Anomaly detection using Lightweight Online Detector of Anomalies (LODA). LODA detects anomalies in a dataset by computing the likelihood of data points using an ensemble of one-dimensional histograms.
- Parameters
Methods
explain
(anomaly[, scaled])Explain anomaly based on contributions (t-scores) of each feature across histograms.
fit
(train_data)Fit training data and construct histograms.
load_model
(file_path)This function load already saved model and sets cuda parameters.
save_model
(file_path)This function save model to given location.
score
(input_data)Calculate anomaly scores using negative likelihood across n_random_cuts histograms.
- explain(anomaly, scaled=True)
Explain anomaly based on contributions (t-scores) of each feature across histograms.
- Parameters
anomaly (cupy.ndarray) – selected anomaly from input dataset
scaled (boolean) – set to scale output feature importance scores
Examples
>>> loda_ad.explain(x[5]) # x[5] is found anomaly array([[1. ], [0. ], [0.69850349], [0.91081035], [0.78774349]])
- fit(train_data)
Fit training data and construct histograms.
- Parameters
train_data (cupy.ndarray) – NxD training sample
Examples
>>> from clx.analytics.loda import Loda >>> import cupy as cp >>> x = cp.random.randn(100,5) # 5-D multivariate synthetic dataset >>> loda_ad = Loda(n_bins=None, n_random_cuts=100) >>> loda_ad.fit(x)
- classmethod load_model(file_path)
This function load already saved model and sets cuda parameters. :param file_path: File path of a model to load. :type filel_path: string
- save_model(file_path)
This function save model to given location.
- Parameters
file_path (string) – File path to save model.
- score(input_data)
Calculate anomaly scores using negative likelihood across n_random_cuts histograms.
- Parameters
input_data (cupy.ndarray) – NxD training sample
Examples
>>> from clx.analytics.loda import Loda >>> import cupy as cp >>> x = cp.random.randn(100,5) # 5-D multivariate synthetic dataset >>> loda_ad = Loda(n_bins=None, n_random_cuts=100) >>> loda_ad.fit(x) >>> loda_ad.score(x) array([0.04295848, 0.02853553, 0.04587308, 0.03750692, 0.05050418, 0.02671958, 0.03538646, 0.05606504, 0.03418612, 0.04040502, 0.03542846, 0.02801463, 0.04884918, 0.02943411, 0.02741364, 0.02702433, 0.03064191, 0.02575712, 0.03957355, 0.02729784, ... 0.03943715, 0.02701243, 0.02880341, 0.04086408, 0.04365477])
- class clx.analytics.model.rnn_classifier.RNNClassifier(input_size, hidden_size, output_size, n_layers, bidirectional=True)
Methods
forward
(input, seq_lengths)Defines the computation performed at every call.
- forward(input, seq_lengths)
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class clx.analytics.model.tabular_model.TabularModel(emb_szs, n_cont, out_sz, layers, drops, emb_drop, use_bn, is_reg, is_multi)
Basic model for tabular data
Methods
forward
(x_cat, x_cont)Defines the computation performed at every call.
- forward(x_cat, x_cont)
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- clx.analytics.anomaly_detection.dbscan(feature_dataframe, min_samples=3, eps=0.3)
Pass a feature dataframe to this function to detect anomalies in your feature dataframe. This function uses
cuML
DBSCAN to detect anomalies and outputs associated labels 0,1,-1.- Parameters
- :param feature_dataframe: Feature dataframe to be used for clustering
- :type feature_dataframe: cudf.DataFrame
- :param min_samples: Minimum samples to use for dbscan
- :type min_samples: int
- :param eps: Max distance to use for dbscan
- :type eps: float
Examples
>>> import cudf >>> import clx.features >>> import clx.analytics.anomaly_detection >>> df = cudf.DataFrame( >>> { >>> "time": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], >>> "user": ["u1","u1","u1","u1","u1","u1","u1","u1","u1","u1","u5","u4","u2","u3"], >>> "computer": ["c1","c2","c3","c1","c2","c3","c1","c1","c2","c3","c1","c1","c5","c6"], >>> } >>> ) >>> feature_df = clx.features.frequency(df, entity_id="user", feature_id="computer") >>> labels = clx.analytics.anomaly_detection.dbscan(feature_df, min_samples=2, eps=0.5) >>> labels 0 -1 1 -1 2 -1 dtype: int32
- clx.analytics.periodicity_detection.filter_periodogram(prdg, p_value)
Select important frequencies by filtering periodogram by p-value. Filtered out frequencies are set to zero.
- Parameters
prdg – periodogram to be filtered
p_value – p-value to filter by
- Returns
CuPy array representing periodogram
- Return type
cupy.ndarray
- clx.analytics.periodicity_detection.to_periodogram(signal)
Returns periodogram of signal for finding frequencies that have high energy.
- Parameters
signal (cudf.Series) – signal (time domain)
- Returns
CuPy array representing periodogram
- Return type
cupy.ndarray
- clx.analytics.periodicity_detection.to_time_domain(prdg)
Convert the signal back to time domain.
- Parameters
prdg (cupy.ndarray) – periodogram (frequency domain)
- Returns
CuPy array representing reconstructed signal
- Return type
cupy.ndarray
- clx.analytics.stats.rzscore(series, window)
Calculates rolling z-score
- Parameters
- seriescudf.Series
Series for which to calculate rolling z-score
- windowint
Window size
- Returns
- cudf.Series
Series with rolling z-score values
Examples
>>> import clx.analytics.stats >>> import cudf >>> sequence = [3,4,5,6,1,10,34,2,1,11,45,34,2,9,19,43,24,13,23,10,98,84,10] >>> series = cudf.Series(sequence) >>> zscores_df = cudf.DataFrame() >>> zscores_df['zscore'] = clx.analytics.stats.rzscore(series, 7) >>> zscores_df zscore 0 null 1 null 2 null 3 null 4 null 5 null 6 2.374423424 7 -0.645941275 8 -0.683973734 9 0.158832461 10 1.847751909 11 0.880026019 12 -0.950835449 13 -0.360593742 14 0.111407599 15 1.228914145 16 -0.074966331 17 -0.570321249 18 0.327849973 19 -0.934372308 20 2.296828498 21 1.282966989 22 -0.795223674
DNS Extractor
- clx.dns.dns_extractor.extract_hostnames(url_series)
This function extracts hostnames from the given urls.
- Parameters
url_series (cudf.Series) – Urls that are to be handled.
- Returns
Hostnames extracted from the urls.
- Return type
cudf.Series
Examples
>>> from cudf import DataFrame >>> from clx.dns import dns_extractor as dns >>> input_df = DataFrame( ... { ... "url": [ ... "http://www.google.com", ... "gmail.com", ... "github.com", ... "https://pandas.pydata.org", ... ] ... } ... ) >>> dns.extract_hostnames(input_df["url"]) 0 www.google.com 1 gmail.com 2 github.com 3 pandas.pydata.org Name: 0, dtype: object
- clx.dns.dns_extractor.generate_tld_cols(hostname_split_df, hostnames, col_len)
This function generates tld columns.
- Parameters
hostname_split_df (cudf.DataFrame) – Hostname splits.
hostnames (cudf.DataFrame) – Hostnames.
col_len – Hostname splits dataframe columns length.
- Returns
Tld columns with all combination.
- Return type
cudf.DataFrame
Examples
>>> import cudf >>> from clx.dns import dns_extractor as dns >>> hostnames = cudf.Series(["www.google.com", "pandas.pydata.org"]) >>> hostname_splits = dns.get_hostname_split_df(hostnames) >>> print(hostname_splits) 2 1 0 0 com google www 1 org pydata pandas >>> col_len = len(hostname_split_df.columns) - 1 >>> col_len = len(hostname_splits.columns) - 1 >>> dns.generate_tld_cols(hostname_splits, hostnames, col_len) 2 1 0 tld2 tld1 tld0 0 com google www com google.com www.google.com 1 org pydata pandas org pydata.org pandas.pydata.org
- clx.dns.dns_extractor.parse_url(url_series, req_cols=None)
This function extracts subdomain, domain and suffix for a given url.
- Parameters
url_df_col (cudf.Series) – Urls that are to be handled.
req_cols (set(strings)) – Columns requested to extract such as (domain, subdomain, suffix and hostname).
- Returns
Extracted information of requested columns.
- Return type
cudf.DataFrame
Examples
>>> from cudf import DataFrame >>> from clx.dns import dns_extractor as dns >>> >>> input_df = DataFrame( ... { ... "url": [ ... "http://www.google.com", ... "gmail.com", ... "github.com", ... "https://pandas.pydata.org", ... ] ... } ... ) >>> dns.parse_url(input_df["url"]) hostname domain suffix subdomain 0 www.google.com google com www 1 gmail.com gmail com 2 github.com github com 3 pandas.pydata.org pydata org pandas >>> dns.parse_url(input_df["url"], req_cols={'domain', 'suffix'}) domain suffix 0 google com 1 gmail com 2 github com 3 pydata org
Exploratory Data Analysis
- class clx.eda.EDA(dataframe)
An EDA (Exploratory Data Analysis) Object. EDA is used to explore different features of a given dataframe.
- Parameters
dataframe (cudf.DataFrame) – Dataframe to be used for analysis
Examples
>>> from clx.eda import EDA >>> import cudf >>> import pandas as pd >>> df = cudf.DataFrame() >>> df['a'] = [1,2,3,4] >>> df['b'] = ['a','b','c','c'] >>> df['c'] = [True, False, True, True] >>> df['d'] = cudf.Series(pd.date_range("2000-01-01", periods=3,freq="m")) >>> eda = EDA(df) >>> eda { "SummaryStatistics": { "a": { "dtype": "int64", "summary": { "unique": "4", "total": "4" } }, "b": { "dtype": "object", "summary": { "unique": "3", "total": "4" } }, "c": { "dtype": "bool", "summary": { "true_percent": "0.75" } }, "d": { "dtype": "datetime64[ns]", "summary": { "timespan": "60 days, 2880 hours, 0 minutes, 0 seconds" } } } }
Methods
Create cuxfilter dashboard for Exploratory Data Analysis.
save_analysis
(dirpath)Save analysis output to directory path.
- property analysis
Analysis results as a dict
- cuxfilter_dashboard()
Create cuxfilter dashboard for Exploratory Data Analysis.
- Returns
cuxfilter dashboard with populated with data and charts.
- Return type
cuxfilter.DashBoard
- property dataframe
Dataframe used for analysis
Heuristics
- clx.heuristics.ports.major_ports(addr_col, port_col, min_conns=1, eph_min=10000)
Find major ports for each address. This is done by computing the mean number of connections across all ports for each address and then filters out all ports that don’t cross this threshold. Also adds column for IANA service name correspondingto each port.
- Parameters
addr_col (cudf.Series) – Column of addresses as strings
port_col (cudf.Series) – Column of corresponding port numbers as ints
min_conns (int) – Filter out ip:port rows that don’t have at least this number of connections (default: 1)
eph_min (int) – Ports greater than or equal to this will be labeled as an ephemeral service (default: 10000)
- Returns
DataFrame with columns for address, port, IANA service corresponding to port, and number of connections
- Return type
cudf.DataFrame
Examples
>>> import clx.heuristics.ports as ports >>> import cudf >>> input_addr_col = cudf.Series(["10.0.75.1","10.0.75.1","10.0.75.1","10.0.75.255","10.110.104.107", "10.110.104.107"]) >>> input_port_col = cudf.Series([137,137,7680,137,7680, 7680]) >>> ports.major_ports(input_addr_col, input_port_col, min_conns=2, eph_min=7000) addr port service conns 0 10.0.75.1 137 netbios-ns 2 1 10.110.104.107 7680 ephemeral 2
OSI (Open Source Integration)
- class clx.osi.farsight.FarsightLookupClient(server, apikey, limit=None, http_proxy=None, https_proxy=None)
Wrapper class to query DNSDB record in various ways Example: by IP, DomainName
- Parameters
server – Farsight server
apikey – API key
limit – limit
http_proxy – HTTP proxy
https_proxy – HTTPS proxy
Methods
query_rdata_ip
(rdata_ip[, before, after])Query to find DNSDB records matching a specific IP address with given time range.
query_rdata_name
(rdata_name[, rrtype, ...])Query matches only a single DNSDB record of given owner name and time ranges.
query_rrset
(oname[, rrtype, bailiwick, ...])Batch version of querying DNSDB by given domain name and time ranges.
- query_rdata_ip(rdata_ip, before=None, after=None)
Query to find DNSDB records matching a specific IP address with given time range.
- Parameters
rdata_ip (str) – The VALUE is one of an IPv4 or IPv6 single address, with a prefix length, or with an address range. If a prefix is provided, the delimiter between the network address and prefix length is a single comma (“,”) character rather than the usual slash (“/”) character to avoid clashing with the HTTP URI path name separator..
before (UNIX timestamp) – Output results seen before this time.
after (UNIX timestamp) – Output results seen after this time.
- Returns
Response
- Return type
Examples
>>> from clx.osi.farsight import FarsightLookupClient >>> client = FarsightLookupClient("https://localhost", "your-api-key", limit=1) >> client.query_rdata_ip("100.0.0.1") {"status_code": 200,...} >>> client.query_rdata_ip("100.0.0.1", before=1428433465, after=1538014110) {"status_code": 200,...}
- query_rdata_name(rdata_name, rrtype=None, before=None, after=None)
Query matches only a single DNSDB record of given owner name and time ranges.
- Parameters
rdata_name (str) – DNS domain name.
rrtype (str) – The resource record type of the resource record, either using the standard DNS type mnemonic, or an RFC 3597 generic type, i.e. the string TYPE immediately followed by the decimal RRtype number.
before (UNIX timestamp) – Output results seen before this time.
after (UNIX timestamp) – Output results seen after this time.
- Returns
Response
- Return type
Examples
>>> from clx.osi.farsight import FarsightLookupClient >>> client = FarsightLookupClient("https://localhost", "your-api-key", limit=1) >>> client.query_rdata_name("www.farsightsecurity.com") {"status_code": 200,...} >>> client.query_rdata_name("www.farsightsecurity.com", rrtype="PTR", before=1386638408, after=1561176503) {"status_code": 200,...}
- query_rrset(oname, rrtype=None, bailiwick=None, before=None, after=None)
Batch version of querying DNSDB by given domain name and time ranges.
- Parameters
oname (str) – DNS domain name.
rrtype (str) – The resource record type of the resource record, either using the standard DNS type mnemonic, or an RFC 3597 generic type, i.e. the string TYPE immediately followed by the decimal RRtype number.
bailiwick (str) – The “bailiwick” of an RRset in DNSDB observed via passive DNS replication is the closest enclosing zone delegated to a nameserver which served the RRset.
before (UNIX timestamp) – Output results seen before this time.
after (UNIX timestamp) – Output results seen after this time.
- Returns
Response
- Return type
Examples
>>> from clx.osi.farsight import FarsightLookupClient >>> client = FarsightLookupClient("https://localhost", "your-api-key") >>> client.query_rrset("www.dnsdb.info") {"status_code": 200,...} >>> client.query_rrset("www.dnsdb.info", rrtype="CNAME", bailiwick="dnsdb.info.", before=1374184718, after=1564909243,) {"status_code": 200,...}
- class clx.osi.virus_total.VirusTotalClient(api_key=None, proxies=None)
Wrapper class to query VirusTotal database.
- Parameters
apikey – API key
proxies – proxies
- Attributes
- api_key
- proxies
- vt_endpoint_dict
Methods
domain_report
(domain)Retrieve report using domain.
file_report
(*resource)Retrieve file scan reports
file_rescan
(*resource)This function rescan given files.
file_scan
(file)This function allows you to send a file for scanning with VirusTotal.
ipaddress_report
(ip)Retrieve report using ip address.
put_comment
(resource, comment)Post comment for a file or URL
scan_big_file
(files)Scanning files larger than 32MB
url_report
(*resource)Retrieve URL scan reports
url_scan
(*url)Retrieve URL scan reports
- domain_report(domain)
Retrieve report using domain.
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.domain_report("027.ru") {'status_code': 200, 'json_resp': {'BitDefender category': 'parked', 'undetected_downloaded_samples'...}}
- file_report(*resource)
Retrieve file scan reports
- Parameters
*resource –
The resource argument can be the MD5, SHA-1 or SHA-256 of a file for which you want to retrieve
the most recent antivirus report. You may also specify a scan_id returned by the /file/scan endpoint. :type *resource: str :return: Response :rtype: dict
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.file_report(["99017f6eebbac24f351415dd410d522d"]) {'status_code': 200, 'json_resp': {'scans': {'Bkav': {'detected': True, 'version': '1.3.0.9899', 'result': 'W32.AIDetectVM.malware1'...}}
- file_rescan(*resource)
This function rescan given files.
- Parameters
*resource –
The resource argument can be the MD5, SHA-1 or SHA-256 of the file you want to re-scan.
- Returns
Response
- Return type
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.file_rescan('70c0942965354dbb132c05458866b96709e37f44') {'status_code': 200, 'json_resp': {'scan_id': ...}}
- file_scan(file)
This function allows you to send a file for scanning with VirusTotal. Before performing submissions it would be nice to retrieve the latest report on the file. File size limit is 32MB, in order to submit files up to 200MB in size it is mandatory to use scan_big_file feature
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.file_scan('test.sh') {'status_code': 200, 'json_resp': {'scan_id': '0204e88255a0bd7807547e9186621f0478a6bb2c43e795fb5e6934e5cda0e1f6-1605914572', 'sha1': '70c0942965354dbb132c05458866b96709e37f44'...}
- ipaddress_report(ip)
Retrieve report using ip address.
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.ipaddress_report("90.156.201.27") {'status_code': 200, 'json_resp': {'asn': 25532, 'undetected_urls...}}
- put_comment(resource, comment)
Post comment for a file or URL
- Parameters
resource (str) – Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want to comment on.
- Returns
Response
- Return type
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.put_comment("75efd85cf6f8a962fe016787a7f57206ea9263086ee496fc62e3fc56734d4b53", "This is a test comment") {'status_code': 200, 'json_resp': {'response_code': 0, 'verbose_msg': 'Duplicate comment'}}
- scan_big_file(files)
Scanning files larger than 32MB
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.scan_big_file('test.sh') {'status_code': 200, 'json_resp': {'scan_id': '0204e88255a0bd7807547e9186621f0478a6bb2c43e795fb5e6934e5cda0e1f6-1605914572', 'sha1': '70c0942965354dbb132c05458866b96709e37f44'...}
- url_report(*resource)
Retrieve URL scan reports
- Parameters
*resource –
The resource argument must be the URL to retrieve the most recent report.
- Returns
Response
- Return type
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.url_report(["virustotal.com"]) {'status_code': 200, 'json_resp': {'scan_id': 'a354494a73382ea0b4bc47f4c9e8d6c578027cd4598196dc88f05a22b5817293-1605914280'...}
- url_scan(*url)
Retrieve URL scan reports
- Parameters
*url –
A URL for which you want to retrieve the most recent report. You may also specify a scan_id (sha256-timestamp as returned by the URL submission API) to access a specific report.
- Returns
Response
- Return type
Examples
>>> from clx.osi.virus_total import VirusTotalClient >>> client = VirusTotalClient(api_key='your-api-key') >>> client.url_scan(["virustotal.com"]) {'status_code': 200, 'json_resp': {'permalink': 'https://www.virustotal.com/gui/url/...}}
- class clx.osi.whois.WhoIsLookupClient(sep=',', datetime_format='%m-%d-%Y %H:%M:%S')
Methods
whois
(domains[, arr2str])Function to access parsed WhoIs data for a given domain.
- datetime_arr_keys = ['creation_date', 'updated_date', 'expiration_date']
Wrapper class to query WhoIs API.
- Parameters
sep – Delimiter to concat nested list values from the Whois response.
datetime_format – Format to convert WhoIs response datetime object.
- whois(domains, arr2str=True)
Function to access parsed WhoIs data for a given domain.
- Parameters
domains (list) – Domains to perform whois lookup.
arr2str (boolean) – Convert WhoIs lookup response object to list of strings.
- Returns
WhoIs information with respect to given domains.
- Return type
list/obj
Examples
>>> from clx.osi.whois import WhoIsLookupClient >>> domains = ["nvidia.com"] >>> client = WhoIsLookupClient() >>> client.whois(domains) [{'domain_name': 'NVIDIA.COM', 'registrar': 'Safenames Ltd', 'whois_server': 'whois.safenames.net'...}]
- class clx.osi.slashnext.SlashNextClient(api_key, snx_ir_workspace, base_url='https://oti.slashnext.cloud/api')
- Attributes
- conn
Methods
Find information about your API quota, like current usage, quota left etc.
download_html
(scanid)Downloads a web page HTML against a previous URL scan request.
download_screenshot
(scanid[, resolution])Downloads a screenshot of a web page against a previous URL scan request.
download_text
(scanid)Downloads the text of a web page against a previous URL scan request.
host_report
(host)Queries the SlashNext cloud database and retrieves a detailed report.
host_reputation
(host)Queries the SlashNext cloud database and retrieves the reputation of a host.
host_urls
(host[, limit])Queries the SlashNext cloud database and retrieves a list of all URLs.
scan_report
(scanid[, extended_info])Retrieve URL scan results against a previous scan request.
url_scan
(url[, extended_info])Perform a real-time URL reputation scan with SlashNext cloud-based SEER threat detection engine.
url_scan_sync
(url[, extended_info, timeout])Perform a real-time URL scan with SlashNext cloud-based SEER threat detection engine in a blocking mode.
Verify SlashNext cloud database connection.
- api_quota()
Find information about your API quota, like current usage, quota left etc.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.api_quota() >>> type(response_list[0]) <class 'dict'>
- download_html(scanid)
Downloads a web page HTML against a previous URL scan request.
- Parameters
scanid (str) – Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.download_html('2-ba57-755a7458c8a3') >>> type(response_list[0]) <class 'dict'>
- download_screenshot(scanid, resolution='high')
Downloads a screenshot of a web page against a previous URL scan request.
- Parameters
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.download_screenshot('2-ba57-755a7458c8a3') >>> type(response_list[0]) <class 'dict'>
- download_text(scanid)
Downloads the text of a web page against a previous URL scan request.
- Parameters
scanid (str) – Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.download_text('2-ba57-755a7458c8a3') >>> type(response_list[0]) <class 'dict'>
- host_report(host)
Queries the SlashNext cloud database and retrieves a detailed report.
- Parameters
host (str) – The host to look up in the SlashNext Threat Intelligence database. Can be either a domain name or an IPv4 address.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.host_report('google.com') >>> type(response_list[0]) <class 'dict'>
- host_reputation(host)
Queries the SlashNext cloud database and retrieves the reputation of a host.
- Parameters
host (str) – The host to look up in the SlashNext Threat Intelligence database. Can be either a domain name or an IPv4 address.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.host_reputation('google.com') >>> type(response_list[0]) <class 'dict'>
- host_urls(host, limit=10)
Queries the SlashNext cloud database and retrieves a list of all URLs.
- Parameters
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.host_urls('google.com', limit=1) >>> type(response_list[0]) <class 'dict'>
- scan_report(scanid, extended_info=True)
Retrieve URL scan results against a previous scan request.
- Parameters
scanid (str) – Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action.
extended_info (boolean) – Whether to download forensics data, such as screenshot, HTML, and rendered text.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.scan_report('2-ba57-755a7458c8a3', extended_info=False) >>> type(response_list[0]) <class 'dict'>
- url_scan(url, extended_info=True)
Perform a real-time URL reputation scan with SlashNext cloud-based SEER threat detection engine.
- Parameters
url (str) – The URL that needs to be scanned.
extended_info (boolean) – Whether to download forensics data, such as screenshot, HTML, and rendered text.
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.url_scan('http://ajeetenterprises.in/js/kbrad/drive/index.php', extended_info=False) >>> type(response_list[0]) <class 'dict'>
- url_scan_sync(url, extended_info=True, timeout=60)
Perform a real-time URL scan with SlashNext cloud-based SEER threat detection engine in a blocking mode.
- Parameters
:return Query response as list. :rtype: list
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> response_list = slashnext.url_scan_sync('http://ajeetenterprises.in/js/kbrad/drive/index.php', extended_info=False, timeout=10) >>> type(response_list[0]) <class 'dict'>
- verify_connection()
Verify SlashNext cloud database connection.
Examples
>>> from clx.osi.slashnext import SlashNextClient >>> api_key = 'slashnext_cloud_apikey' >>> snx_ir_workspace_dir = 'snx_ir_workspace' >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> slashnext.verify_connection() Successfully connected to SlashNext cloud. 'success'
Parsers
- class clx.parsers.event_parser.EventParser(columns, event_name)
This is an abstract class for all event log parsers.
- Parameters
columns (set(string)) – Event column names.
event_name (string) – Event name
- Attributes
columns
List of columns that are being processed.
event_name
Event name define type of logs that are being processed.
Methods
filter_by_pattern
(df, column, pattern)Retrieve events only which satisfies given regex pattern.
parse
(dataframe, raw_column)Abstract method 'parse' triggers the parsing functionality.
parse_raw_event
(dataframe, raw_column, ...)Processes parsing of a specific type of raw event records received as a dataframe.
- property columns
List of columns that are being processed.
- Returns
Event column names.
- Return type
set(string)
- property event_name
Event name define type of logs that are being processed.
- Returns
Event name
- Return type
string
- filter_by_pattern(df, column, pattern)
Retrieve events only which satisfies given regex pattern.
- Parameters
df (cudf.DataFrame) – Raw events to be filtered.
column (string) – Raw data contained column name.
pattern (string) – Regex pattern to retrieve events that are required.
- Returns
filtered dataframe.
- Return type
cudf.DataFrame
- abstract parse(dataframe, raw_column)
Abstract method ‘parse’ triggers the parsing functionality. Subclasses are required to implement and execute any parsing pre-processing steps.
- parse_raw_event(dataframe, raw_column, event_regex)
Processes parsing of a specific type of raw event records received as a dataframe.
- Parameters
dataframe (cudf.DataFrame) – Raw events to be parsed.
raw_column (string) – Raw data contained column name.
event_regex (dict) – Required regular expressions for a given event type.
- Returns
parsed information.
- Return type
cudf.DataFrame
- class clx.parsers.splunk_notable_parser.SplunkNotableParser
This is class parses splunk notable logs.
Methods
parse
(dataframe, raw_column)Parses the Splunk notable raw events.
- parse(dataframe, raw_column)
Parses the Splunk notable raw events.
- Parameters
dataframe (cudf.DataFrame) – Raw events to be parsed.
raw_column (string) – Raw data contained column name.
- Returns
parsed information.
- Return type
cudf.DataFrame
- class clx.parsers.windows_event_parser.WindowsEventParser(interested_eventcodes=None)
This is class parses windows event logs.
- Parameters
interested_eventcodes (set(int)) – This parameter provides flexibility to parse only interested eventcodes.
Methods
clean_raw_data
(dataframe, raw_column)Lower casing and replacing escape characters.
Get columns of windows event codes.
parse
(dataframe, raw_column)Parses the Windows raw event.
- clean_raw_data(dataframe, raw_column)
Lower casing and replacing escape characters.
- Parameters
dataframe (cudf.DataFrame) – Raw events to be parsed.
raw_column (string) – Raw data contained column name.
- Returns
Clean raw information.
- Return type
cudf.DataFrame
- get_columns()
Get columns of windows event codes.
- Returns
Columns of all configured eventcodes, if no interested eventcodes specified.
- Return type
set(string)
- parse(dataframe, raw_column)
Parses the Windows raw event.
- Parameters
dataframe (cudf.DataFrame) – Raw events to be parsed.
raw_column (string) – Raw data contained column name.
- Returns
Parsed information.
- Return type
cudf.DataFrame
- clx.parsers.zeek.parse_log_file(filepath)
Parse Zeek log file and return cuDF dataframe. Uses header comments to get column names/types and configure parser.
- Parameters
filepath (string) – filepath for Zeek log file
- Returns
Zeek log dataframe
- Return type
cudf.DataFrame
Utils
- class clx.utils.data.dataloader.DataLoader(dataset, batchsize=1000)
Wrapper class is used to return dataframe partitions based on batchsize.
- Attributes
- dataset
- dataset_len
Methods
A generator function that yields each chunk of original input dataframe based on batchsize :return: Partitioned dataframe.
- get_chunks()
A generator function that yields each chunk of original input dataframe based on batchsize :return: Partitioned dataframe. :rtype: cudf.DataFrame
- class clx.utils.data.dataset.Dataset(df)
-
- property data
Retruns dataframe
- property length
Returns dataframe length
- clx.utils.data.utils
alias of <module ‘clx.utils.data.utils’ from ‘/opt/conda/envs/rapids/lib/python3.9/site-packages/clx-22.12.0a0+1.g10ffcc3-py3.9.egg/clx/utils/data/utils.py’>
Workflow
- class clx.workflow.workflow.Workflow(name, source=None, destination=None)
- Attributes
destination
Dictionary of configuration parameters for the data destination (writer)
name
Name of the workflow for logging purposes.
source
Dictionary of configuration parameters for the data source (reader)
Methods
Decorator used to capture a benchmark for a given function
Run workflow.
set_destination
(destination)Set destination.
set_source
(source)Set source.
Close workflow.
workflow
(dataframe)The pipeline function performs the data enrichment on the data.
- benchmark()
Decorator used to capture a benchmark for a given function
- property destination
Dictionary of configuration parameters for the data destination (writer)
- property name
Name of the workflow for logging purposes.
- run_workflow()
Run workflow. Reader (source) fetches data. Workflow implementation is executed. Workflow output is written to destination.
- set_destination(destination)
Set destination.
- Parameters
destination – dict of configuration parameters for the destination (writer)
- set_source(source)
Set source.
- Parameters
source – dict of configuration parameters for data source (reader)
- property source
Dictionary of configuration parameters for the data source (reader)
- stop_workflow()
Close workflow. This includes calling close() method on reader (source) and writer (destination)
- abstract workflow(dataframe)
The pipeline function performs the data enrichment on the data. Subclasses must define this function. This function will return a gpu dataframe with enriched data.
- class clx.workflow.splunk_alert_workflow.SplunkAlertWorkflow(name, source=None, destination=None, interval='day', threshold=2.5, window=7, raw_data_col_name='_raw')
- Attributes
interval
Interval can be set to day or hour by which z score will be calculated
raw_data_col_name
Dataframe column name containing raw splunk alert data
threshold
Threshold by which to flag z score.
window
Window by which to calculate rolling z score
Methods
workflow
(dataframe)The pipeline function performs the data enrichment on the data.
- property interval
Interval can be set to day or hour by which z score will be calculated
- property raw_data_col_name
Dataframe column name containing raw splunk alert data
- property threshold
Threshold by which to flag z score. Threshold will be flagged for scores >threshold or <-threshold
- property window
Window by which to calculate rolling z score
- workflow(dataframe)
The pipeline function performs the data enrichment on the data. Subclasses must define this function. This function will return a gpu dataframe with enriched data.
I/O
- class clx.io.reader.kafka_reader.KafkaReader(batch_size, consumer, time_window=30)
Reads from Kafka based on config object.
- Parameters
batch_size – batch size
consumer – Kafka consumer
time_window – Max window of time that queued events will wait to be pushed to workflow
- Attributes
- consumer
- has_data
- time_window
Methods
close
()Close Kafka reader
Fetch data from Kafka based on provided config object
- close()
Close Kafka reader
- fetch_data()
Fetch data from Kafka based on provided config object
- class clx.io.reader.dask_fs_reader.DaskFileSystemReader(config)
Uses Dask to read from file system based on config object.
- Parameters
config – dictionary object of config values for type, input_format, input_path, and dask reader optional keyword args
Methods
close
()Close dask reader
Fetch data using dask based on provided config object
- close()
Close dask reader
- fetch_data()
Fetch data using dask based on provided config object
- class clx.io.reader.fs_reader.FileSystemReader(config)
Uses cudf to read from file system based on config object.
- Parameters
config – dictionary object of config values for type, input_format, input_path (or output_path), and cudf reader optional keyword args
Methods
close
()Close cudf reader
Fetch data using cudf based on provided config object
- close()
Close cudf reader
- fetch_data()
Fetch data using cudf based on provided config object
- class clx.io.writer.kafka_writer.KafkaWriter(kafka_topic, batch_size, delimiter, producer)
Publish to Kafka topic based on config object.
- Parameters
kafka_topic – Kafka topic
batch_size – batch size
delimiter – delimiter
producer – producer
- Attributes
- delimiter
- producer
Methods
close
()Close Kafka writer
write_data
(df)publish messages to kafka topic
- close()
Close Kafka writer
- write_data(df)
publish messages to kafka topic
- Parameters
df – dataframe to publish
- class clx.io.writer.fs_writer.FileSystemWriter(config)
Uses cudf to write to file system based on config object.
- Parameters
config – dictionary object of config values for type, output_format, output_path (or output_path), and cudf writer optional keyword args
Methods
close
()Close cudf writer
write_data
(df)Write data to file system using cudf based on provided config object
- close()
Close cudf writer
- write_data(df)
Write data to file system using cudf based on provided config object