API Reference

IP

Features

clx.features.binary(dataframe, entity_id, feature_id)

Create binary feature dataframe using provided dataset, entity, and feature.

Parameters
  • dataframe (cudf.DataFrame) – Input dataframe to create binary features

  • entity_id (str) – Entity ID. Must be a column within dataframe

  • feature_id (str) – Feature ID. Must be a column within dataframe

Returns

dataframe

Return type

cudf.DataFrame

Examples

>>> import cudf
>>> import clx.features
>>> df = cudf.DataFrame(
        {
            "time": [1, 2, 3],
            "user": ["u1", "u2", "u1",],
            "computer": ["c1", "c1", "c3"],
        }
    )
>>> output = clx.features.binary(df, "user", "computer")
>>> output
        c1  c3
    user
    u1      1.0     1.0
    u2      1.0     0.0
clx.features.frequency(dataframe, entity_id, feature_id)

Create frequency feature dataframe using provided dataset, entity, and feature.

Parameters
  • dataframe (cudf.DataFrame) – Input dataframe to create binary features

  • entity_id (str) – Entity ID. Must be a column within dataframe

  • feature_id (str) – Feature ID. Must be a column within dataframe

Returns

dataframe

Return type

cudf.DataFrame

Examples

>>> import cudf
>>> import clx.features
>>> df = cudf.DataFrame(
        {
            "time": [1, 2, 3],
            "user": ["u1", "u2", "u1",],
            "computer": ["c1", "c1", "c3"],
        }
    )
>>> output = clx.features.binary(df, "user", "computer")
>>> output
        c1  c3
    user
    u1      0.5     0.5
    u2      1.0     0.0

Analytics

class clx.analytics.detector.Detector(lr=0.001)
Attributes
criterion
model
optimizer

Methods

leverage_model(model)

This function leverages model by setting parallelism parameters.

init_model

load_model

predict

save_model

train_model

leverage_model(model)

This function leverages model by setting parallelism parameters.

Parameters

model (RNNClassifier) – Model instance.

class clx.analytics.model.rnn_classifier.RNNClassifier(input_size, hidden_size, output_size, n_layers, bidirectional=True)

Methods

forward(input, seq_lengths)

Defines the computation performed at every call.

forward(input, seq_lengths)

Defines the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class clx.analytics.model.tabular_model.TabularModel(emb_szs, n_cont, out_sz, layers, drops, emb_drop, use_bn, is_reg, is_multi)

Basic model for tabular data

Methods

forward(x_cat, x_cont)

Defines the computation performed at every call.

forward(x_cat, x_cont)

Defines the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

clx.analytics.stats.rzscore(series, window)

Calculates rolling z-score

Parameters
seriescudf.Series

Series for which to calculate rolling z-score

windowint

Window size

Returns
cudf.Series

Series with rolling z-score values

Examples

>>> import clx.analytics.stats
>>> import cudf
>>> sequence = [3,4,5,6,1,10,34,2,1,11,45,34,2,9,19,43,24,13,23,10,98,84,10]
>>> series = cudf.Series(sequence)
>>> zscores_df = cudf.DataFrame()
>>> zscores_df['zscore'] = clx.analytics.stats.rzscore(series, 7)
>>> zscores_df
            zscore
0           null
1           null
2           null
3           null
4           null
5           null
6    2.374423424
7   -0.645941275
8   -0.683973734
9    0.158832461
10   1.847751909
11   0.880026019
12  -0.950835449
13  -0.360593742
14   0.111407599
15   1.228914145
16  -0.074966331
17  -0.570321249
18   0.327849973
19  -0.934372308
20   2.296828498
21   1.282966989
22  -0.795223674

DNS Extractor

Exploratory Data Analysis

Heuristics

OSI (Open Source Integration)

class clx.osi.farsight.FarsightLookupClient(server, apikey, limit=None, http_proxy=None, https_proxy=None)

Wrapper class to query DNSDB record in various ways Example: by IP, DomainName

Parameters
  • server – Farsight server

  • apikey – API key

  • limit – limit

  • http_proxy – HTTP proxy

  • https_proxy – HTTPS proxy

Methods

query_rdata_ip(rdata_ip[, before, after])

Query to find DNSDB records matching a specific IP address with given time range.

query_rdata_name(rdata_name[, rrtype, …])

Query matches only a single DNSDB record of given owner name and time ranges.

query_rrset(oname[, rrtype, bailiwick, …])

Batch version of querying DNSDB by given domain name and time ranges.

query_rdata_ip(rdata_ip, before=None, after=None)

Query to find DNSDB records matching a specific IP address with given time range. :param rdata_ip: The VALUE is one of an IPv4 or IPv6 single address, with a prefix length, or with an address range. If a prefix is provided, the delimiter between the network address and prefix length is a single comma (“,”) character rather than the usual slash (“/”) character to avoid clashing with the HTTP URI path name separator.. :type rdata_ip: str :param before: Output results seen before this time. :type before: UNIX timestamp :param after: Output results seen after this time. :type after: UNIX timestamp :return: Response :rtype: dict

Examples

>>> from clx.osi.farsight import FarsightLookupClient
>>> client = FarsightLookupClient("https://localhost", "your-api-key", limit=1)
>> client.query_rdata_ip("100.0.0.1")
{"status_code": 200,...}
>>> client.query_rdata_ip("100.0.0.1", before=1428433465, after=1538014110)
{"status_code": 200,...}
query_rdata_name(rdata_name, rrtype=None, before=None, after=None)

Query matches only a single DNSDB record of given owner name and time ranges. :param rdata_name: DNS domain name. :type rdata_name: str :param rrtype: The resource record type of the resource record, either using the standard DNS type mnemonic, or an RFC 3597 generic type, i.e. the string TYPE immediately followed by the decimal RRtype number. :type rrtype: str :param before: Output results seen before this time. :type before: UNIX timestamp :param after: Output results seen after this time. :type after: UNIX timestamp :return: Response :rtype: dict

Examples

>>> from clx.osi.farsight import FarsightLookupClient
>>> client = FarsightLookupClient("https://localhost", "your-api-key", limit=1)
>>> client.query_rdata_name("www.farsightsecurity.com")
{"status_code": 200,...}
>>> client.query_rdata_name("www.farsightsecurity.com", rrtype="PTR", before=1386638408, after=1561176503)
{"status_code": 200,...}
query_rrset(oname, rrtype=None, bailiwick=None, before=None, after=None)

Batch version of querying DNSDB by given domain name and time ranges. :param oname: DNS domain name. :type oname: str :param rrtype: The resource record type of the resource record, either using the standard DNS type mnemonic, or an RFC 3597 generic type, i.e. the string TYPE immediately followed by the decimal RRtype number. :type rrtype: str :param bailiwick: The “bailiwick” of an RRset in DNSDB observed via passive DNS replication is the closest enclosing zone delegated to a nameserver which served the RRset. :type bailiwick: str :param before: Output results seen before this time. :type before: UNIX timestamp :param after: Output results seen after this time. :type after: UNIX timestamp :return: Response :rtype: dict

Examples

>>> from clx.osi.farsight import FarsightLookupClient
>>> client = FarsightLookupClient("https://localhost", "your-api-key")
>>> client.query_rrset("www.dnsdb.info")
{"status_code": 200,...}
>>> client.query_rrset("www.dnsdb.info", rrtype="CNAME", bailiwick="dnsdb.info.", before=1374184718, after=1564909243,)
{"status_code": 200,...}
class clx.osi.virus_total.VirusTotalClient(api_key=None, proxies=None)

Wrapper class to query VirusTotal database.

Parameters
  • apikey – API key

  • proxies – proxies

Attributes
api_key
proxies
vt_endpoint_dict

Methods

domain_report(domain)

Retrieve report using domain.

file_report(*resource)

Retrieve file scan reports :param *resource: The resource argument can be the MD5, SHA-1 or SHA-256 of a file for which you want to retrieve the most recent antivirus report. You may also specify a scan_id returned by the /file/scan endpoint. :type *resource: str :return: Response :rtype: dict.

file_rescan(*resource)

This function rescan given files. :param *resource: The resource argument can be the MD5, SHA-1 or SHA-256 of the file you want to re-scan. :type *resource: str :return: Response :rtype: dict.

file_scan(file)

This function allows you to send a file for scanning with VirusTotal.

ipaddress_report(ip)

Retrieve report using ip address.

put_comment(resource, comment)

Post comment for a file or URL :param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want to comment on.

scan_big_file(files)

Scanning files larger than 32MB :param file: File to be scanned :type file: str :return: Response :rtype: dict

url_report(*resource)

Retrieve URL scan reports :param *resource: The resource argument must be the URL to retrieve the most recent report. :type *resource: str :return: Response :rtype: dict.

url_scan(*url)

Retrieve URL scan reports :param *url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id (sha256-timestamp as returned by the URL submission API) to access a specific report. :type *url: str :return: Response :rtype: dict.

domain_report(domain)

Retrieve report using domain. :param domain: A domain name :type domain: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.domain_report("027.ru")
{'status_code': 200, 'json_resp': {'BitDefender category': 'parked', 'undetected_downloaded_samples'...}}
file_report(*resource)

Retrieve file scan reports :param *resource: The resource argument can be the MD5, SHA-1 or SHA-256 of a file for which you want to retrieve the most recent antivirus report. You may also specify a scan_id returned by the /file/scan endpoint. :type *resource: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.file_report(["99017f6eebbac24f351415dd410d522d"])
{'status_code': 200, 'json_resp': {'scans': {'Bkav': {'detected': True, 'version': '1.3.0.9899', 'result': 'W32.AIDetectVM.malware1'...}}
file_rescan(*resource)

This function rescan given files. :param *resource: The resource argument can be the MD5, SHA-1 or SHA-256 of the file you want to re-scan. :type *resource: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.file_rescan('70c0942965354dbb132c05458866b96709e37f44')
{'status_code': 200, 'json_resp': {'scan_id': ...}}
file_scan(file)

This function allows you to send a file for scanning with VirusTotal. Before performing submissions it would be nice to retrieve the latest report on the file. File size limit is 32MB, in order to submit files up to 200MB in size it is mandatory to use scan_big_file feature :param file: File to be scanned :type file: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.file_scan('test.sh')
{'status_code': 200, 'json_resp': {'scan_id': '0204e88255a0bd7807547e9186621f0478a6bb2c43e795fb5e6934e5cda0e1f6-1605914572', 'sha1': '70c0942965354dbb132c05458866b96709e37f44'...}
ipaddress_report(ip)

Retrieve report using ip address. :param ip: An IP address :type ip: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.ipaddress_report("90.156.201.27")
{'status_code': 200, 'json_resp': {'asn': 25532, 'undetected_urls...}}
put_comment(resource, comment)

Post comment for a file or URL :param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want to comment on. :type resource: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.put_comment("75efd85cf6f8a962fe016787a7f57206ea9263086ee496fc62e3fc56734d4b53", "This is a test comment")
{'status_code': 200, 'json_resp': {'response_code': 0, 'verbose_msg': 'Duplicate comment'}}
scan_big_file(files)

Scanning files larger than 32MB :param file: File to be scanned :type file: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.scan_big_file('test.sh')
{'status_code': 200, 'json_resp': {'scan_id': '0204e88255a0bd7807547e9186621f0478a6bb2c43e795fb5e6934e5cda0e1f6-1605914572', 'sha1': '70c0942965354dbb132c05458866b96709e37f44'...}
url_report(*resource)

Retrieve URL scan reports :param *resource: The resource argument must be the URL to retrieve the most recent report. :type *resource: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.url_report(["virustotal.com"])
{'status_code': 200, 'json_resp': {'scan_id': 'a354494a73382ea0b4bc47f4c9e8d6c578027cd4598196dc88f05a22b5817293-1605914280'...}
url_scan(*url)

Retrieve URL scan reports :param *url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id (sha256-timestamp as returned by the URL submission API) to access a specific report. :type *url: str :return: Response :rtype: dict

Examples

>>> from clx.osi.virus_total import VirusTotalClient
>>> client = VirusTotalClient(api_key='your-api-key')
>>> client.url_scan(["virustotal.com"])
{'status_code': 200, 'json_resp': {'permalink': 'https://www.virustotal.com/gui/url/...}}
class clx.osi.whois.WhoIsLookupClient(sep=',', datetime_format='%m-%d-%Y %H:%M:%S')

Methods

whois(domains[, arr2str])

Function to access parsed WhoIs data for a given domain.

datetime_arr_keys = ['creation_date', 'updated_date', 'expiration_date']

Wrapper class to query WhoIs API.

Parameters
  • sep – Delimiter to concat nested list values from the Whois response.

  • datetime_format – Format to convert WhoIs response datetime object.

whois(domains, arr2str=True)

Function to access parsed WhoIs data for a given domain. :param domains: Domains to perform whois lookup. :type domains: list :param arr2str: Convert WhoIs lookup response object to list of strings. :type arr2str: boolean :return: WhoIs information with respect to given domains. :rtype: list/obj

Examples

>>> from clx.osi.whois import WhoIsLookupClient
>>> domains = ["nvidia.com"]
>>> client = WhoIsLookupClient()
>>> client.whois(domains)
[{'domain_name': 'NVIDIA.COM', 'registrar': 'Safenames Ltd', 'whois_server': 'whois.safenames.net'...}]
class clx.osi.slashnext.SlashNextClient(api_key, snx_ir_workspace, base_url='https://oti.slashnext.cloud/api')
Attributes
conn

Methods

api_quota()

Find information about your API quota, like current usage, quota left etc.

download_html(scanid)

Downloads a web page HTML against a previous URL scan request.

download_screenshot(scanid[, resolution])

Downloads a screenshot of a web page against a previous URL scan request.

download_text(scanid)

Downloads the text of a web page against a previous URL scan request.

host_report(host)

Queries the SlashNext cloud database and retrieves a detailed report.

host_reputation(host)

Queries the SlashNext cloud database and retrieves the reputation of a host.

host_urls(host[, limit])

Queries the SlashNext cloud database and retrieves a list of all URLs.

scan_report(scanid[, extended_info])

Retrieve URL scan results against a previous scan request.

url_scan(url[, extended_info])

Perform a real-time URL reputation scan with SlashNext cloud-based SEER threat detection engine.

url_scan_sync(url[, extended_info, timeout])

Perform a real-time URL scan with SlashNext cloud-based SEER threat detection engine in a blocking mode.

verify_connection()

Verify SlashNext cloud database connection.

api_quota()

Find information about your API quota, like current usage, quota left etc. :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.api_quota()
>>> type(response_list[0])
<class 'dict'>
download_html(scanid)

Downloads a web page HTML against a previous URL scan request. :param scanid: Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action. :type scanid: str :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.download_html('2-ba57-755a7458c8a3')
>>> type(response_list[0])
<class 'dict'>
download_screenshot(scanid, resolution='high')

Downloads a screenshot of a web page against a previous URL scan request. :param scanid: Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action. :type scanid: str :param resolution: Resolution of the web page screenshot. Can be “high” or “medium”. Default is “high”. :type resolution: str :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.download_screenshot('2-ba57-755a7458c8a3')
>>> type(response_list[0])
<class 'dict'>
download_text(scanid)

Downloads the text of a web page against a previous URL scan request. :param scanid: Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action. :type scanid: str :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.download_text('2-ba57-755a7458c8a3')
>>> type(response_list[0])
<class 'dict'>
host_report(host)

Queries the SlashNext cloud database and retrieves a detailed report. :param host: The host to look up in the SlashNext Threat Intelligence database. Can be either a domain name or an IPv4 address. :type host: str :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.host_report('google.com')
>>> type(response_list[0])
<class 'dict'>
host_reputation(host)

Queries the SlashNext cloud database and retrieves the reputation of a host. :param host: The host to look up in the SlashNext Threat Intelligence database. Can be either a domain name or an IPv4 address. :type host: str :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.host_reputation('google.com')
>>> type(response_list[0])
<class 'dict'>
host_urls(host, limit=10)

Queries the SlashNext cloud database and retrieves a list of all URLs. :param host: The host to look up in the SlashNext Threat Intelligence database, for which to return a list of associated URLs. Can be either a domain name or an IPv4 address. :type host: str :param limit: The maximum number of URL records to fetch. Default is “10”. :type limit: int :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.host_urls('google.com', limit=1)
>>> type(response_list[0])
<class 'dict'>
scan_report(scanid, extended_info=True)

Retrieve URL scan results against a previous scan request. :param scanid: Scan ID of the scan for which to get the report. Can be retrieved from the “slashnext-url-scan” action or “slashnext-url-scan-sync” action. :type scanid: str :param extended_info: Whether to download forensics data, such as screenshot, HTML, and rendered text. :type extended_info: boolean :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.scan_report('2-ba57-755a7458c8a3', extended_info=False)
>>> type(response_list[0])
<class 'dict'>
url_scan(url, extended_info=True)

Perform a real-time URL reputation scan with SlashNext cloud-based SEER threat detection engine. :param url: The URL that needs to be scanned. :type url: str :param extended_info: Whether to download forensics data, such as screenshot, HTML, and rendered text. :type extended_info: boolean :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.url_scan('http://ajeetenterprises.in/js/kbrad/drive/index.php', extended_info=False)
>>> type(response_list[0])
<class 'dict'>
url_scan_sync(url, extended_info=True, timeout=60)

Perform a real-time URL scan with SlashNext cloud-based SEER threat detection engine in a blocking mode. :param url: The URL that needs to be scanned. :type url: str :param extended_info: Whether to download forensics data, such as screenshot, HTML, and rendered text. :type extended_info: boolean :param timeout: A timeout value in seconds. If no timeout value is specified, a default timeout value is 60 seconds. :type timeout: int :return Query response as list. :rtype: list

Examples

>>> from clx.osi.slashnext import SlashNextClient
>>> api_key = 'slashnext_cloud_apikey'
>>> snx_ir_workspace_dir = 'snx_ir_workspace'
>>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir)
>>> response_list = slashnext.url_scan_sync('http://ajeetenterprises.in/js/kbrad/drive/index.php', extended_info=False, timeout=10)
>>> type(response_list[0])
<class 'dict'>
verify_connection()

Verify SlashNext cloud database connection. Examples ——– >>> from clx.osi.slashnext import SlashNextClient >>> api_key = ‘slashnext_cloud_apikey’ >>> snx_ir_workspace_dir = ‘snx_ir_workspace’ >>> slashnext = SlashNextClient(api_key, snx_ir_workspace_dir) >>> slashnext.verify_connection() Successfully connected to SlashNext cloud. ‘success’

Parsers

Utils

class clx.utils.data.dataloader.DataLoader(dataset, batchsize=1000)

Wrapper class is used to return dataframe partitions based on batchsize.

Attributes
dataset
dataset_len

Methods

get_chunks()

A generator function that yields each chunk of original input dataframe based on batchsize :return: Partitioned dataframe.

get_chunks()

A generator function that yields each chunk of original input dataframe based on batchsize :return: Partitioned dataframe. :rtype: cudf.DataFrame

class clx.utils.data.dataset.Dataset(df)
Attributes
data

Retruns dataframe

length

Returns dataframe length

property data

Retruns dataframe

property length

Returns dataframe length

Workflow

I/O

class clx.io.writer.kafka_writer.KafkaWriter(kafka_topic, batch_size, delimiter, producer)

Publish to Kafka topic based on config object.

Parameters
  • kafka_topic – Kafka topic

  • batch_size – batch size

  • delimiter – delimiter

  • producer – producer

Attributes
delimiter
producer

Methods

close()

Close Kafka writer

write_data(df)

publish messages to kafka topic

close()

Close Kafka writer

write_data(df)

publish messages to kafka topic

Parameters

df – dataframe to publish