kafka_consumer.hpp
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2020-2025, NVIDIA CORPORATION.
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #pragma once
6 
7 #include "kafka_callback.hpp"
8 
9 #include <cudf/io/datasource.hpp>
10 
11 #include <librdkafka/rdkafkacpp.h>
12 
13 #include <algorithm>
14 #include <chrono>
15 #include <map>
16 #include <memory>
17 #include <string>
18 
19 namespace cudf {
20 namespace io {
21 namespace external {
22 namespace kafka {
23 
30  public:
50  kafka_consumer(std::map<std::string, std::string> configs,
51  python_callable_type python_callable,
52  kafka_oauth_callback_wrapper_type callable_wrapper);
53 
74  kafka_consumer(std::map<std::string, std::string> configs,
75  python_callable_type python_callable,
76  kafka_oauth_callback_wrapper_type callable_wrapper,
77  std::string const& topic_name,
78  int partition,
79  int64_t start_offset,
80  int64_t end_offset,
81  int batch_timeout,
82  std::string const& delimiter);
83 
92  std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override;
93 
99  size_t size() const override;
100 
110  size_t host_read(size_t offset, size_t size, uint8_t* dst) override;
111 
122  void commit_offset(std::string const& topic, int partition, int64_t offset);
123 
135  std::map<std::string, int64_t> get_watermark_offset(std::string const& topic,
136  int partition,
137  int timeout,
138  bool cached);
139 
145  std::map<std::string, std::string> current_configs();
146 
155  int64_t get_committed_offset(std::string const& topic, int partition);
156 
166  std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic);
167 
174  void close(int timeout);
175 
182  void unsubscribe();
183 
184  virtual ~kafka_consumer() {};
185 
186  private:
187  std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
188  std::unique_ptr<RdKafka::KafkaConsumer> consumer;
189 
190  std::map<std::string, std::string> configs;
191  python_callable_type python_callable_;
192  kafka_oauth_callback_wrapper_type callable_wrapper_;
193 
194  std::string topic_name;
195  int partition;
196  int64_t start_offset;
197  int64_t end_offset;
198  int batch_timeout;
199  int default_timeout = 10000; // milliseconds
200  std::string delimiter;
201 
202  std::string buffer;
203 
204  private:
205  RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string const& topic,
206  int partition,
207  int64_t offset);
208 
212  int64_t now();
213 
214  void consume_to_buffer();
215 };
216 
217 } // namespace kafka
218 } // namespace external
219 } // namespace io
220 } // namespace cudf
Interface class for providing input data to the readers.
Definition: datasource.hpp:31
libcudf datasource for Apache Kafka
void unsubscribe()
Stop all active consumption and remove consumer subscriptions to topic/partition instances.
std::unique_ptr< cudf::io::datasource::buffer > host_read(size_t offset, size_t size) override
Returns a buffer with a subset of data from Kafka Topic.
std::map< std::string, std::string > current_configs()
Retrieve the current Kafka client configurations.
std::map< std::string, std::vector< int32_t > > list_topics(std::string specific_topic)
Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then th...
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)
Creates an instance of the Kafka consumer object that is in a semi-ready state.
size_t host_read(size_t offset, size_t size, uint8_t *dst) override
Reads a selected range into a preallocated buffer.
void commit_offset(std::string const &topic, int partition, int64_t offset)
Commits an offset to a specified Kafka Topic/Partition instance.
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)
Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at http...
void close(int timeout)
Close the underlying socket connection to Kafka and clean up system resources.
size_t size() const override
Returns the size of the data in Kafka buffer.
int64_t get_committed_offset(std::string const &topic, int partition)
Get the latest offset that was successfully committed to the Kafka broker.
std::map< std::string, int64_t > get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)
Retrieve the watermark offset values for a topic/partition.
cuDF interfaces
Definition: host_udf.hpp:26