libcudf datasource for Apache Kafka More...
#include <kafka_consumer.hpp>
Public Member Functions | |
kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper) | |
Creates an instance of the Kafka consumer object that is in a semi-ready state. More... | |
kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter) | |
Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. More... | |
std::unique_ptr< cudf::io::datasource::buffer > | host_read (size_t offset, size_t size) override |
Returns a buffer with a subset of data from Kafka Topic. More... | |
size_t | size () const override |
Returns the size of the data in Kafka buffer. More... | |
size_t | host_read (size_t offset, size_t size, uint8_t *dst) override |
Reads a selected range into a preallocated buffer. More... | |
void | commit_offset (std::string const &topic, int partition, int64_t offset) |
Commits an offset to a specified Kafka Topic/Partition instance. More... | |
std::map< std::string, int64_t > | get_watermark_offset (std::string const &topic, int partition, int timeout, bool cached) |
Retrieve the watermark offset values for a topic/partition. More... | |
std::map< std::string, std::string > | current_configs () |
Retrieve the current Kafka client configurations. More... | |
int64_t | get_committed_offset (std::string const &topic, int partition) |
Get the latest offset that was successfully committed to the Kafka broker. More... | |
std::map< std::string, std::vector< int32_t > > | list_topics (std::string specific_topic) |
Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then the partitions for all Topics in the broker will be retrieved. More... | |
void | close (int timeout) |
Close the underlying socket connection to Kafka and clean up system resources. More... | |
void | unsubscribe () |
Stop all active consumption and remove consumer subscriptions to topic/partition instances. More... | |
Public Member Functions inherited from cudf::io::datasource | |
virtual | ~datasource () |
Base class destructor. | |
virtual bool | supports_device_read () const |
Whether or not this source supports reading directly into device memory. More... | |
virtual bool | is_device_read_preferred (size_t size) const |
Estimates whether a direct device read would be more optimal for the given size. More... | |
virtual std::unique_ptr< datasource::buffer > | device_read (size_t offset, size_t size, rmm::cuda_stream_view stream) |
Returns a device buffer with a subset of data from the source. More... | |
virtual size_t | device_read (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream) |
Reads a selected range into a preallocated device buffer. More... | |
virtual std::future< size_t > | device_read_async (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream) |
Asynchronously reads a selected range into a preallocated device buffer. More... | |
virtual bool | is_empty () const |
Returns whether the source contains any data. More... | |
Additional Inherited Members | |
Static Public Member Functions inherited from cudf::io::datasource | |
static std::unique_ptr< datasource > | create (std::string const &filepath, size_t offset=0, size_t size=0) |
Creates a source from a file path. More... | |
static std::unique_ptr< datasource > | create (host_buffer const &buffer) |
Creates a source from a host memory buffer. More... | |
static std::unique_ptr< datasource > | create (cudf::host_span< std::byte const > buffer) |
Creates a source from a host memory buffer. More... | |
static std::unique_ptr< datasource > | create (cudf::device_span< std::byte const > buffer) |
Creates a source from a device memory buffer. More... | |
static std::unique_ptr< datasource > | create (datasource *source) |
Creates a source from an user implemented datasource object. More... | |
template<typename T > | |
static std::vector< std::unique_ptr< datasource > > | create (std::vector< T > const &args) |
Creates a vector of datasources, one per element in the input vector. More... | |
libcudf datasource for Apache Kafka
Definition at line 40 of file kafka_consumer.hpp.
cudf::io::external::kafka::kafka_consumer::kafka_consumer | ( | std::map< std::string, std::string > | configs, |
python_callable_type | python_callable, | ||
kafka_oauth_callback_wrapper_type | callable_wrapper | ||
) |
Creates an instance of the Kafka consumer object that is in a semi-ready state.
A consumer in a semi-ready state does not have all required parameters to make successful consumer interactions with the Kafka broker. However in the semi-ready state Kafka metadata operations are still possible. This is useful for clients who plan to only use those metadata operations. This is useful when the need for delayed partition and topic assignment is not known ahead of time and needs to be delayed to as late as possible. Documentation for librdkafka configurations can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
configs | key/value pairs of librdkafka configurations that will be passed to the librdkafka client |
python_callable | python_callable_type pointer to a Python functools.partial object |
callable_wrapper | kafka_oauth_callback_wrapper_type Cython wrapper that will be used to invoke the python_callable . This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka. |
cudf::io::external::kafka::kafka_consumer::kafka_consumer | ( | std::map< std::string, std::string > | configs, |
python_callable_type | python_callable, | ||
kafka_oauth_callback_wrapper_type | callable_wrapper, | ||
std::string const & | topic_name, | ||
int | partition, | ||
int64_t | start_offset, | ||
int64_t | end_offset, | ||
int | batch_timeout, | ||
std::string const & | delimiter | ||
) |
Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
configs | key/value pairs of librdkafka configurations that will be passed to the librdkafka client |
python_callable | python_callable_type pointer to a Python functools.partial object |
callable_wrapper | kafka_oauth_callback_wrapper_type Cython wrapper that will be used to invoke the python_callable . This wrapper serves the purpose of preventing us from having to link against the Python development library in libcudf_kafka. |
topic_name | name of the Kafka topic to consume from |
partition | partition index to consume from between 0 and TOPIC_NUM_PARTITIONS - 1 inclusive |
start_offset | seek position for the specified TOPPAR (Topic/Partition combo) |
end_offset | position in the specified TOPPAR to read to |
batch_timeout | maximum (millisecond) read time allowed. If end_offset is not reached before batch_timeout, a smaller subset will be returned |
delimiter | optional delimiter to insert into the output between kafka messages, Ex: "\n" |
void cudf::io::external::kafka::kafka_consumer::close | ( | int | timeout | ) |
Close the underlying socket connection to Kafka and clean up system resources.
cudf::logic_error | on failure to close the connection |
timeout | Max milliseconds to wait on a response |
void cudf::io::external::kafka::kafka_consumer::commit_offset | ( | std::string const & | topic, |
int | partition, | ||
int64_t | offset | ||
) |
Commits an offset to a specified Kafka Topic/Partition instance.
cudf::logic_error | on failure to commit the partition offset |
[in] | topic | Name of the Kafka topic that the offset should be set for |
[in] | partition | Partition on the specified topic that should be used |
[in] | offset | Offset that should be set for the topic/partition pair |
std::map<std::string, std::string> cudf::io::external::kafka::kafka_consumer::current_configs | ( | ) |
Retrieve the current Kafka client configurations.
int64_t cudf::io::external::kafka::kafka_consumer::get_committed_offset | ( | std::string const & | topic, |
int | partition | ||
) |
Get the latest offset that was successfully committed to the Kafka broker.
[in] | topic | Topic name for the topic/partition pair |
[in] | partition | Partition number of the topic/partition pair |
std::map<std::string, int64_t> cudf::io::external::kafka::kafka_consumer::get_watermark_offset | ( | std::string const & | topic, |
int | partition, | ||
int | timeout, | ||
bool | cached | ||
) |
Retrieve the watermark offset values for a topic/partition.
[in] | topic | Name of the Kafka topic that the watermark should be retrieved for |
[in] | partition | Partition on the specified topic which should be used |
[in] | timeout | Max milliseconds to wait on a response from the Kafka broker |
[in] | cached | If True uses the last retrieved value from the Kafka broker, if False the latest value will be retrieved from the Kafka broker by making a network request. |
|
overridevirtual |
Returns a buffer with a subset of data from Kafka Topic.
[in] | offset | Bytes from the start |
[in] | size | Bytes to read |
Implements cudf::io::datasource.
|
overridevirtual |
Reads a selected range into a preallocated buffer.
[in] | offset | Bytes from the start |
[in] | size | Bytes to read |
[in] | dst | Address of the existing host memory |
Implements cudf::io::datasource.
std::map<std::string, std::vector<int32_t> > cudf::io::external::kafka::kafka_consumer::list_topics | ( | std::string | specific_topic | ) |
Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then the partitions for all Topics in the broker will be retrieved.
[in] | specific_topic | The name of the topic for which to retrieve partitions. If empty then the partitions for all topics will be retrieved. |
|
overridevirtual |
Returns the size of the data in Kafka buffer.
Implements cudf::io::datasource.
void cudf::io::external::kafka::kafka_consumer::unsubscribe | ( | ) |
Stop all active consumption and remove consumer subscriptions to topic/partition instances.
cudf::logic_error | on failure to unsubscribe from the active partition assignments. |