kafka_consumer.hpp
1 /*
2  * Copyright (c) 2020-2022, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "kafka_callback.hpp"
19 
20 #include <cudf/io/datasource.hpp>
21 
22 #include <librdkafka/rdkafkacpp.h>
23 
24 #include <algorithm>
25 #include <chrono>
26 #include <map>
27 #include <memory>
28 #include <string>
29 
30 namespace cudf {
31 namespace io {
32 namespace external {
33 namespace kafka {
34 
41  public:
61  kafka_consumer(std::map<std::string, std::string> configs,
62  python_callable_type python_callable,
63  kafka_oauth_callback_wrapper_type callable_wrapper);
64 
85  kafka_consumer(std::map<std::string, std::string> configs,
86  python_callable_type python_callable,
87  kafka_oauth_callback_wrapper_type callable_wrapper,
88  std::string const& topic_name,
89  int partition,
90  int64_t start_offset,
91  int64_t end_offset,
92  int batch_timeout,
93  std::string const& delimiter);
94 
103  std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override;
104 
110  size_t size() const override;
111 
121  size_t host_read(size_t offset, size_t size, uint8_t* dst) override;
122 
133  void commit_offset(std::string const& topic, int partition, int64_t offset);
134 
146  std::map<std::string, int64_t> get_watermark_offset(std::string const& topic,
147  int partition,
148  int timeout,
149  bool cached);
150 
156  std::map<std::string, std::string> current_configs();
157 
166  int64_t get_committed_offset(std::string const& topic, int partition);
167 
177  std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic);
178 
185  void close(int timeout);
186 
193  void unsubscribe();
194 
195  virtual ~kafka_consumer(){};
196 
197  private:
198  std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
199  std::unique_ptr<RdKafka::KafkaConsumer> consumer;
200 
201  std::map<std::string, std::string> configs;
202  python_callable_type python_callable_;
203  kafka_oauth_callback_wrapper_type callable_wrapper_;
204 
205  std::string topic_name;
206  int partition;
207  int64_t start_offset;
208  int64_t end_offset;
209  int batch_timeout;
210  int default_timeout = 10000; // milliseconds
211  std::string delimiter;
212 
213  std::string buffer;
214 
215  private:
216  RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string const& topic,
217  int partition,
218  int64_t offset);
219 
223  int64_t now();
224 
225  void consume_to_buffer();
226 };
227 
228 } // namespace kafka
229 } // namespace external
230 } // namespace io
231 } // namespace cudf
Interface class for providing input data to the readers.
Definition: datasource.hpp:41
libcudf datasource for Apache Kafka
void unsubscribe()
Stop all active consumption and remove consumer subscriptions to topic/partition instances.
std::unique_ptr< cudf::io::datasource::buffer > host_read(size_t offset, size_t size) override
Returns a buffer with a subset of data from Kafka Topic.
std::map< std::string, std::string > current_configs()
Retrieve the current Kafka client configurations.
std::map< std::string, std::vector< int32_t > > list_topics(std::string specific_topic)
Query the Kafka broker for the list of Topic partitions for a Topic. If no topic is specified then th...
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)
Creates an instance of the Kafka consumer object that is in a semi-ready state.
size_t host_read(size_t offset, size_t size, uint8_t *dst) override
Reads a selected range into a preallocated buffer.
void commit_offset(std::string const &topic, int partition, int64_t offset)
Commits an offset to a specified Kafka Topic/Partition instance.
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)
Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be found at http...
void close(int timeout)
Close the underlying socket connection to Kafka and clean up system resources.
size_t size() const override
Returns the size of the data in Kafka buffer.
int64_t get_committed_offset(std::string const &topic, int partition)
Get the latest offset that was successfully committed to the Kafka broker.
std::map< std::string, int64_t > get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)
Retrieve the watermark offset values for a topic/partition.
cuDF interfaces
Definition: aggregation.hpp:34