Io Datasources#
- group io_datasources
-
class datasource#
- #include <datasource.hpp>
Interface class for providing input data to the readers.
Subclassed by cudf::io::external::kafka::kafka_consumer
Public Functions
-
inline virtual ~datasource()#
Base class destructor.
-
virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0#
Returns a buffer with a subset of data from the source.
- Parameters:
offset – [in] Bytes from the start
size – [in] Bytes to read
- Returns:
The data buffer (can be smaller than size)
-
virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) = 0#
Reads a selected range into a preallocated buffer.
- Parameters:
offset – [in] Bytes from the start
size – [in] Bytes to read
dst – [in] Address of the existing host memory
- Returns:
The number of bytes read (can be smaller than size)
-
inline virtual bool supports_device_read() const#
Whether or not this source supports reading directly into device memory.
If this function returns true, the datasource will receive calls to device_read() instead of host_read() when the reader processes the data on the device. Most readers will still make host_read() calls, for the parts of input that are processed on the host (e.g. metadata).
Data source implementations that don’t support direct device reads don’t need to override this function. The implementations that do should override it to return false.
- Returns:
bool Whether this source supports device_read() calls
-
inline virtual bool is_device_read_preferred(size_t size) const#
Estimates whether a direct device read would be more optimal for the given size.
- Parameters:
size – Number of bytes to read
- Returns:
whether the device read is expected to be more performant for the given size
-
inline virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size, rmm::cuda_stream_view stream)#
Returns a device buffer with a subset of data from the source.
For optimal performance, should only be called when
is_device_read_preferred
returnstrue
. Data source implementations that don’t support direct device reads don’t need to override this function.- Throws:
cudf::logic_error – the object does not support direct device reads, i.e.
supports_device_read
returnsfalse
.- Parameters:
offset – Number of bytes from the start
size – Number of bytes to read
stream – CUDA stream to use
- Returns:
The data buffer in the device memory
-
inline virtual size_t device_read(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#
Reads a selected range into a preallocated device buffer.
For optimal performance, should only be called when
is_device_read_preferred
returnstrue
. Data source implementations that don’t support direct device reads don’t need to override this function.- Throws:
cudf::logic_error – when the object does not support direct device reads, i.e.
supports_device_read
returnsfalse
.- Parameters:
offset – Number of bytes from the start
size – Number of bytes to read
dst – Address of the existing device memory
stream – CUDA stream to use
- Returns:
The number of bytes read (can be smaller than size)
-
inline virtual std::future<size_t> device_read_async(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#
Asynchronously reads a selected range into a preallocated device buffer.
Returns a future value that contains the number of bytes read. Calling
get()
method of the return value synchronizes this function.For optimal performance, should only be called when
is_device_read_preferred
returnstrue
. Data source implementations that don’t support direct device reads don’t need to override this function.- Throws:
cudf::logic_error – when the object does not support direct device reads, i.e.
supports_device_read
returnsfalse
.- Parameters:
offset – Number of bytes from the start
size – Number of bytes to read
dst – Address of the existing device memory
stream – CUDA stream to use
- Returns:
The number of bytes read as a future value (can be smaller than size)
-
virtual size_t size() const = 0#
Returns the size of the data in the source.
- Returns:
The size of the source data in bytes
-
inline virtual bool is_empty() const#
Returns whether the source contains any data.
- Returns:
True if there is data, False otherwise
Public Static Functions
-
static std::unique_ptr<datasource> create(std::string const &filepath, size_t offset = 0, size_t size = 0)#
Creates a source from a file path.
- Parameters:
filepath – [in] Path to the file to use
offset – [in] Bytes from the start of the file (the default is zero)
size – [in] Bytes from the offset; use zero for entire file (the default is zero)
- Returns:
Constructed datasource object
-
static std::unique_ptr<datasource> create(host_buffer const &buffer)#
Creates a source from a host memory buffer.
@deprecated Since 23.04#
- Parameters:
buffer – [in] Host buffer object
- Returns:
Constructed datasource object
-
static std::unique_ptr<datasource> create(cudf::host_span<std::byte const> buffer)#
Creates a source from a host memory buffer.
- Parameters:
buffer – [in] Host buffer object
- Returns:
Constructed datasource object
-
static std::unique_ptr<datasource> create(cudf::device_span<std::byte const> buffer)#
Creates a source from a device memory buffer.
- Parameters:
buffer – Device buffer object
- Returns:
Constructed datasource object
-
static std::unique_ptr<datasource> create(datasource *source)#
Creates a source from an user implemented datasource object.
- Parameters:
source – [in] Non-owning pointer to the datasource object
- Returns:
Constructed datasource object
-
template<typename T>
static inline std::vector<std::unique_ptr<datasource>> create(std::vector<T> const &args)# Creates a vector of datasources, one per element in the input vector.
- Parameters:
args – [in] vector of parameters
- Returns:
Constructed vector of datasource objects
-
class buffer#
- #include <datasource.hpp>
Interface class for buffers that the datasource returns to the caller.
Provides a basic interface to return the data address and size.
Subclassed by cudf::io::datasource::non_owning_buffer, cudf::io::datasource::owning_buffer< Container >
Public Functions
-
virtual size_t size() const = 0#
Returns the buffer size in bytes.
- Returns:
Buffer size in bytes
-
virtual uint8_t const *data() const = 0#
Returns the address of the data in the buffer.
- Returns:
Address of the data in the buffer
-
inline virtual ~buffer()#
Base class destructor.
Public Static Functions
-
template<typename Container>
static inline std::unique_ptr<buffer> create(Container &&data_owner)# Factory to construct a datasource buffer object from a container.
- Template Parameters:
Container – Type of the container to construct the buffer from
- Parameters:
data_owner – The container to construct the buffer from (ownership is transferred)
- Returns:
Constructed buffer object
-
virtual size_t size() const = 0#
-
class non_owning_buffer : public cudf::io::datasource::buffer#
- #include <datasource.hpp>
Implementation for non owning buffer where datasource holds buffer until destruction.
Public Functions
-
inline non_owning_buffer(uint8_t const *data, size_t size)#
Construct a new non owning buffer object.
- Parameters:
data – The data buffer
size – The size of the data buffer
-
inline virtual size_t size() const override#
Returns the size of the buffer.
- Returns:
The size of the buffer in bytes
-
inline virtual uint8_t const *data() const override#
Returns the pointer to the buffer.
- Returns:
Pointer to the buffer
-
inline non_owning_buffer(uint8_t const *data, size_t size)#
-
template<typename Container>
class owning_buffer : public cudf::io::datasource::buffer# - #include <datasource.hpp>
Derived implementation of
buffer
that owns the data.Can use different container types to hold the data buffer.
- Template Parameters:
Container – Type of the container object that owns the data
Public Functions
-
inline owning_buffer(Container &&data_owner)#
Moves the input container into the newly created object.
- Parameters:
data_owner – The container to construct the buffer from (ownership is transferred)
-
inline owning_buffer(Container &&data_owner, uint8_t const *data_ptr, size_t size)#
Moves the input container into the newly created object, and exposes a subspan of the buffer.
- Parameters:
data_owner – The container to construct the buffer from (ownership is transferred)
data_ptr – Pointer to the start of the subspan
size – The size of the subspan
-
inline virtual size_t size() const override#
Returns the size of the buffer.
- Returns:
The size of the buffer in bytes
-
inline virtual uint8_t const *data() const override#
Returns the pointer to the data in the buffer.
- Returns:
Pointer to the data in the buffer
-
inline virtual ~datasource()#
-
class kafka_consumer : public cudf::io::datasource#
- #include <kafka_consumer.hpp>
libcudf datasource for Apache Kafka
Public Functions
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#
Creates an instance of the Kafka consumer object that is in a semi-ready state.
A consumer in a semi-ready state does not have all required parameters to make successful consumer interactions with the Kafka broker. However in the semi-ready state Kafka metadata operations are still possible. This is useful for clients who plan to only use those metadata operations. This is useful when the need for delayed partition and topic assignment is not known ahead of time and needs to be delayed to as late as possible. Documentation for librdkafka configurations can be found at edenhill/librdkafka
- Parameters:
configs – key/value pairs of librdkafka configurations that will be passed to the librdkafka client
python_callable –
python_callable_type
pointer to a Python functools.partial objectcallable_wrapper –
kafka_oauth_callback_wrapper_type
Cython wrapper that will be used to invoke thepython_callable
. This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka.
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)#
Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at edenhill/librdkafka.
- Parameters:
configs – key/value pairs of librdkafka configurations that will be passed to the librdkafka client
python_callable –
python_callable_type
pointer to a Python functools.partial objectcallable_wrapper –
kafka_oauth_callback_wrapper_type
Cython wrapper that will be used to invoke thepython_callable
. This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka.topic_name – name of the Kafka topic to consume from
partition – partition index to consume from between
0
andTOPIC_NUM_PARTITIONS - 1
inclusivestart_offset – seek position for the specified TOPPAR (Topic/Partition combo)
end_offset – position in the specified TOPPAR to read to
batch_timeout – maximum (millisecond) read time allowed. If end_offset is not reached before batch_timeout, a smaller subset will be returned
delimiter – optional delimiter to insert into the output between kafka messages, Ex: “\n”
-
virtual std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override#
Returns a buffer with a subset of data from Kafka Topic.
- Parameters:
offset – [in] Bytes from the start
size – [in] Bytes to read
- Returns:
The data buffer
-
virtual size_t size() const override#
Returns the size of the data in Kafka buffer.
- Returns:
size_t The size of the source data in bytes
-
virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) override#
Reads a selected range into a preallocated buffer.
- Parameters:
offset – [in] Bytes from the start
size – [in] Bytes to read
dst – [in] Address of the existing host memory
- Returns:
The number of bytes read (can be smaller than size)
-
void commit_offset(std::string const &topic, int partition, int64_t offset)#
Commits an offset to a specified Kafka Topic/Partition instance.
- Throws:
cudf::logic_error – on failure to commit the partition offset
- Parameters:
topic – [in] Name of the Kafka topic that the offset should be set for
partition – [in] Partition on the specified topic that should be used
offset – [in] Offset that should be set for the topic/partition pair
-
std::map<std::string, int64_t> get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)#
Retrieve the watermark offset values for a topic/partition.
- Parameters:
topic – [in] Name of the Kafka topic that the watermark should be retrieved for
partition – [in] Partition on the specified topic which should be used
timeout – [in] Max milliseconds to wait on a response from the Kafka broker
cached – [in] If True uses the last retrieved value from the Kafka broker, if False the latest value will be retrieved from the Kafka broker by making a network request.
- Returns:
The watermark offset value for the specified topic/partition
-
std::map<std::string, std::string> current_configs()#
Retrieve the current Kafka client configurations.
- Returns:
Map<string, string> of key/value pairs of the current client configurations
-
int64_t get_committed_offset(std::string const &topic, int partition)#
Get the latest offset that was successfully committed to the Kafka broker.
- Parameters:
topic – [in] Topic name for the topic/partition pair
partition – [in] Partition number of the topic/partition pair
- Returns:
Latest offset for the specified topic/partition pair
-
std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic)#
Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then the partitions for all Topics in the broker will be retrieved.
- Parameters:
specific_topic – [in] The name of the topic for which to retrieve partitions. If empty then the partitions for all topics will be retrieved.
- Returns:
Map of Kafka topic names with their corresponding list of topic partition values.
-
void close(int timeout)#
Close the underlying socket connection to Kafka and clean up system resources.
- Throws:
cudf::logic_error – on failure to close the connection
- Parameters:
timeout – Max milliseconds to wait on a response
-
void unsubscribe()#
Stop all active consumption and remove consumer subscriptions to topic/partition instances.
- Throws:
cudf::logic_error – on failure to unsubscribe from the active partition assignments.
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#
-
class datasource#