Io Datasources#

group io_datasources
class datasource#
#include <datasource.hpp>

Interface class for providing input data to the readers.

Subclassed by cudf::io::external::kafka::kafka_consumer

Public Functions

inline virtual ~datasource()#

Base class destructor.

virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0#

Returns a buffer with a subset of data from the source.

Parameters:
  • offset[in] Bytes from the start

  • size[in] Bytes to read

Returns:

The data buffer (can be smaller than size)

virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) = 0#

Reads a selected range into a preallocated buffer.

Parameters:
  • offset[in] Bytes from the start

  • size[in] Bytes to read

  • dst[in] Address of the existing host memory

Returns:

The number of bytes read (can be smaller than size)

inline virtual bool supports_device_read() const#

Whether or not this source supports reading directly into device memory.

If this function returns true, the datasource will receive calls to device_read() instead of host_read() when the reader processes the data on the device. Most readers will still make host_read() calls, for the parts of input that are processed on the host (e.g. metadata).

Data source implementations that don’t support direct device reads don’t need to override this function. The implementations that do should override it to return false.

Returns:

bool Whether this source supports device_read() calls

inline virtual bool is_device_read_preferred(size_t size) const#

Estimates whether a direct device read would be more optimal for the given size.

Parameters:

size – Number of bytes to read

Returns:

whether the device read is expected to be more performant for the given size

inline virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size, rmm::cuda_stream_view stream)#

Returns a device buffer with a subset of data from the source.

For optimal performance, should only be called when is_device_read_preferred returns true. Data source implementations that don’t support direct device reads don’t need to override this function.

Throws:

cudf::logic_error – the object does not support direct device reads, i.e. supports_device_read returns false.

Parameters:
  • offset – Number of bytes from the start

  • size – Number of bytes to read

  • stream – CUDA stream to use

Returns:

The data buffer in the device memory

inline virtual size_t device_read(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#

Reads a selected range into a preallocated device buffer.

For optimal performance, should only be called when is_device_read_preferred returns true. Data source implementations that don’t support direct device reads don’t need to override this function.

Throws:

cudf::logic_error – when the object does not support direct device reads, i.e. supports_device_read returns false.

Parameters:
  • offset – Number of bytes from the start

  • size – Number of bytes to read

  • dst – Address of the existing device memory

  • stream – CUDA stream to use

Returns:

The number of bytes read (can be smaller than size)

inline virtual std::future<size_t> device_read_async(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#

Asynchronously reads a selected range into a preallocated device buffer.

Returns a future value that contains the number of bytes read. Calling get() method of the return value synchronizes this function.

For optimal performance, should only be called when is_device_read_preferred returns true. Data source implementations that don’t support direct device reads don’t need to override this function.

Throws:

cudf::logic_error – when the object does not support direct device reads, i.e. supports_device_read returns false.

Parameters:
  • offset – Number of bytes from the start

  • size – Number of bytes to read

  • dst – Address of the existing device memory

  • stream – CUDA stream to use

Returns:

The number of bytes read as a future value (can be smaller than size)

virtual size_t size() const = 0#

Returns the size of the data in the source.

Returns:

The size of the source data in bytes

inline virtual bool is_empty() const#

Returns whether the source contains any data.

Returns:

True if there is data, False otherwise

Public Static Functions

static std::unique_ptr<datasource> create(std::string const &filepath, size_t offset = 0, size_t size = 0)#

Creates a source from a file path.

Parameters:
  • filepath[in] Path to the file to use

  • offset[in] Bytes from the start of the file (the default is zero)

  • size[in] Bytes from the offset; use zero for entire file (the default is zero)

Returns:

Constructed datasource object

static std::unique_ptr<datasource> create(host_buffer const &buffer)#

Creates a source from a host memory buffer.

@deprecated Since 23.04#

Parameters:

buffer[in] Host buffer object

Returns:

Constructed datasource object

static std::unique_ptr<datasource> create(cudf::host_span<std::byte const> buffer)#

Creates a source from a host memory buffer.

Parameters:

buffer[in] Host buffer object

Returns:

Constructed datasource object

static std::unique_ptr<datasource> create(cudf::device_span<std::byte const> buffer)#

Creates a source from a device memory buffer.

Parameters:

buffer – Device buffer object

Returns:

Constructed datasource object

static std::unique_ptr<datasource> create(datasource *source)#

Creates a source from an user implemented datasource object.

Parameters:

source[in] Non-owning pointer to the datasource object

Returns:

Constructed datasource object

template<typename T>
static inline std::vector<std::unique_ptr<datasource>> create(std::vector<T> const &args)#

Creates a vector of datasources, one per element in the input vector.

Parameters:

args[in] vector of parameters

Returns:

Constructed vector of datasource objects

class buffer#
#include <datasource.hpp>

Interface class for buffers that the datasource returns to the caller.

Provides a basic interface to return the data address and size.

Subclassed by cudf::io::datasource::non_owning_buffer, cudf::io::datasource::owning_buffer< Container >

Public Functions

virtual size_t size() const = 0#

Returns the buffer size in bytes.

Returns:

Buffer size in bytes

virtual uint8_t const *data() const = 0#

Returns the address of the data in the buffer.

Returns:

Address of the data in the buffer

inline virtual ~buffer()#

Base class destructor.

Public Static Functions

template<typename Container>
static inline std::unique_ptr<buffer> create(Container &&data_owner)#

Factory to construct a datasource buffer object from a container.

Template Parameters:

Container – Type of the container to construct the buffer from

Parameters:

data_owner – The container to construct the buffer from (ownership is transferred)

Returns:

Constructed buffer object

class non_owning_buffer : public cudf::io::datasource::buffer#
#include <datasource.hpp>

Implementation for non owning buffer where datasource holds buffer until destruction.

Public Functions

inline non_owning_buffer(uint8_t const *data, size_t size)#

Construct a new non owning buffer object.

Parameters:
  • data – The data buffer

  • size – The size of the data buffer

inline virtual size_t size() const override#

Returns the size of the buffer.

Returns:

The size of the buffer in bytes

inline virtual uint8_t const *data() const override#

Returns the pointer to the buffer.

Returns:

Pointer to the buffer

template<typename Container>
class owning_buffer : public cudf::io::datasource::buffer#
#include <datasource.hpp>

Derived implementation of buffer that owns the data.

Can use different container types to hold the data buffer.

Template Parameters:

Container – Type of the container object that owns the data

Public Functions

inline owning_buffer(Container &&data_owner)#

Moves the input container into the newly created object.

Parameters:

data_owner – The container to construct the buffer from (ownership is transferred)

inline owning_buffer(Container &&data_owner, uint8_t const *data_ptr, size_t size)#

Moves the input container into the newly created object, and exposes a subspan of the buffer.

Parameters:
  • data_owner – The container to construct the buffer from (ownership is transferred)

  • data_ptr – Pointer to the start of the subspan

  • size – The size of the subspan

inline virtual size_t size() const override#

Returns the size of the buffer.

Returns:

The size of the buffer in bytes

inline virtual uint8_t const *data() const override#

Returns the pointer to the data in the buffer.

Returns:

Pointer to the data in the buffer

class kafka_consumer : public cudf::io::datasource#
#include <kafka_consumer.hpp>

libcudf datasource for Apache Kafka

Public Functions

kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#

Creates an instance of the Kafka consumer object that is in a semi-ready state.

A consumer in a semi-ready state does not have all required parameters to make successful consumer interactions with the Kafka broker. However in the semi-ready state Kafka metadata operations are still possible. This is useful for clients who plan to only use those metadata operations. This is useful when the need for delayed partition and topic assignment is not known ahead of time and needs to be delayed to as late as possible. Documentation for librdkafka configurations can be found at edenhill/librdkafka

Parameters:
  • configs – key/value pairs of librdkafka configurations that will be passed to the librdkafka client

  • python_callablepython_callable_type pointer to a Python functools.partial object

  • callable_wrapperkafka_oauth_callback_wrapper_type Cython wrapper that will be used to invoke the python_callable. This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka.

kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)#

Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at edenhill/librdkafka.

Parameters:
  • configs – key/value pairs of librdkafka configurations that will be passed to the librdkafka client

  • python_callablepython_callable_type pointer to a Python functools.partial object

  • callable_wrapperkafka_oauth_callback_wrapper_type Cython wrapper that will be used to invoke the python_callable. This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka.

  • topic_name – name of the Kafka topic to consume from

  • partition – partition index to consume from between 0 and TOPIC_NUM_PARTITIONS - 1 inclusive

  • start_offset – seek position for the specified TOPPAR (Topic/Partition combo)

  • end_offset – position in the specified TOPPAR to read to

  • batch_timeout – maximum (millisecond) read time allowed. If end_offset is not reached before batch_timeout, a smaller subset will be returned

  • delimiter – optional delimiter to insert into the output between kafka messages, Ex: “\n”

virtual std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override#

Returns a buffer with a subset of data from Kafka Topic.

Parameters:
  • offset[in] Bytes from the start

  • size[in] Bytes to read

Returns:

The data buffer

virtual size_t size() const override#

Returns the size of the data in Kafka buffer.

Returns:

size_t The size of the source data in bytes

virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) override#

Reads a selected range into a preallocated buffer.

Parameters:
  • offset[in] Bytes from the start

  • size[in] Bytes to read

  • dst[in] Address of the existing host memory

Returns:

The number of bytes read (can be smaller than size)

void commit_offset(std::string const &topic, int partition, int64_t offset)#

Commits an offset to a specified Kafka Topic/Partition instance.

Throws:

cudf::logic_error – on failure to commit the partition offset

Parameters:
  • topic[in] Name of the Kafka topic that the offset should be set for

  • partition[in] Partition on the specified topic that should be used

  • offset[in] Offset that should be set for the topic/partition pair

std::map<std::string, int64_t> get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)#

Retrieve the watermark offset values for a topic/partition.

Parameters:
  • topic[in] Name of the Kafka topic that the watermark should be retrieved for

  • partition[in] Partition on the specified topic which should be used

  • timeout[in] Max milliseconds to wait on a response from the Kafka broker

  • cached[in] If True uses the last retrieved value from the Kafka broker, if False the latest value will be retrieved from the Kafka broker by making a network request.

Returns:

The watermark offset value for the specified topic/partition

std::map<std::string, std::string> current_configs()#

Retrieve the current Kafka client configurations.

Returns:

Map<string, string> of key/value pairs of the current client configurations

int64_t get_committed_offset(std::string const &topic, int partition)#

Get the latest offset that was successfully committed to the Kafka broker.

Parameters:
  • topic[in] Topic name for the topic/partition pair

  • partition[in] Partition number of the topic/partition pair

Returns:

Latest offset for the specified topic/partition pair

std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic)#

Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then the partitions for all Topics in the broker will be retrieved.

Parameters:

specific_topic[in] The name of the topic for which to retrieve partitions. If empty then the partitions for all topics will be retrieved.

Returns:

Map of Kafka topic names with their corresponding list of topic partition values.

void close(int timeout)#

Close the underlying socket connection to Kafka and clean up system resources.

Throws:

cudf::logic_error – on failure to close the connection

Parameters:

timeout – Max milliseconds to wait on a response

void unsubscribe()#

Stop all active consumption and remove consumer subscriptions to topic/partition instances.

Throws:

cudf::logic_error – on failure to unsubscribe from the active partition assignments.