defaults.hpp
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION.
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #pragma once
7 
8 #include <cstddef>
9 #include <cstdlib>
10 #include <initializer_list>
11 #include <sstream>
12 #include <stdexcept>
13 #include <string>
14 #include <type_traits>
15 
16 #include <kvikio/compat_mode.hpp>
17 #include <kvikio/error.hpp>
18 #include <kvikio/http_status_codes.hpp>
19 #include <kvikio/shim/cufile.hpp>
20 #include <kvikio/threadpool_wrapper.hpp>
21 
25 namespace kvikio {
26 
27 // Forward declarations of the remote-IO selector enums.
28 enum class RemoteIOBackend : uint8_t;
29 enum class RemoteReactorDispatch : uint8_t;
30 
31 template <typename T>
32 T getenv_or(std::string_view env_var_name, T default_val)
33 {
34  auto const* env_val = std::getenv(env_var_name.data());
35  if (env_val == nullptr) { return default_val; }
36 
37  std::stringstream sstream(env_val);
38  T converted_val;
39  sstream >> converted_val;
40 
41  if constexpr (!std::is_same_v<T, std::string>) {
42  KVIKIO_EXPECT(!sstream.fail(),
43  "unknown config value " + std::string{env_var_name} + "=" + std::string{env_val},
44  std::invalid_argument);
45  }
46 
47  return converted_val;
48 }
49 
50 template <>
51 bool getenv_or(std::string_view env_var_name, bool default_val);
52 
53 template <>
54 CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val);
55 
56 template <>
57 std::vector<int> getenv_or(std::string_view env_var_name, std::vector<int> default_val);
58 
59 template <>
60 RemoteIOBackend getenv_or(std::string_view env_var_name, RemoteIOBackend default_val);
61 
62 template <>
63 RemoteReactorDispatch getenv_or(std::string_view env_var_name, RemoteReactorDispatch default_val);
64 
86 template <typename T>
87 std::tuple<std::string_view, T, bool> getenv_or(
88  std::initializer_list<std::string_view> env_var_names, T default_val)
89 {
90  KVIKIO_EXPECT(env_var_names.size() > 0,
91  "`env_var_names` must contain at least one environment variable name.",
92  std::invalid_argument);
93  std::string_view env_name_target;
94  std::string_view env_val_target;
95 
96  for (auto const& env_var_name : env_var_names) {
97  auto const* env_val = std::getenv(env_var_name.data());
98  if (env_val == nullptr) { continue; }
99 
100  if (!env_name_target.empty() && env_val_target != env_val) {
101  std::stringstream ss;
102  ss << "Environment variable " << env_var_name << " (" << env_val
103  << ") has already been set by its alias " << env_name_target << " (" << env_val_target
104  << ") with a different value.";
105  KVIKIO_FAIL(ss.str(), std::invalid_argument);
106  }
107 
108  env_name_target = env_var_name;
109  env_val_target = env_val;
110  }
111 
112  if (env_name_target.empty()) { return {env_name_target, default_val, false}; }
113 
114  auto res = getenv_or<T>(env_name_target, default_val);
115  return {env_name_target, res, true};
116 }
117 
122 class defaults {
123  private:
124  ThreadPool _thread_pool{get_num_threads_from_env()};
125  CompatMode _compat_mode;
126  std::size_t _task_size;
127  std::size_t _gds_threshold;
128  std::size_t _bounce_buffer_size;
129  std::size_t _http_max_attempts;
130  long _http_timeout;
131  std::vector<int> _http_status_codes;
132  bool _auto_direct_io_read;
133  bool _auto_direct_io_read_overread;
134  bool _auto_direct_io_write;
135  bool _thread_pool_per_block_device;
136  RemoteIOBackend _remote_io_backend;
137  unsigned int _remote_io_num_reactors;
138  RemoteReactorDispatch _remote_io_reactor_dispatch;
139  std::size_t _remote_io_max_concurrent_requests;
140 
141  static unsigned int get_num_threads_from_env();
142 
143  defaults();
144 
145  KVIKIO_EXPORT static defaults* instance();
146 
147  public:
166  [[nodiscard]] static CompatMode compat_mode();
167 
177 
188 
205 
221 
231  [[nodiscard]] static ThreadPool& thread_pool();
232 
241  [[nodiscard]] static unsigned int thread_pool_nthreads();
242 
251  static void set_thread_pool_nthreads(unsigned int nthreads);
252 
258  [[nodiscard]] static unsigned int num_threads();
259 
265  static void set_num_threads(unsigned int nthreads);
266 
275  [[nodiscard]] static std::size_t task_size();
276 
286  static void set_task_size(std::size_t nbytes);
287 
299  [[nodiscard]] static std::size_t gds_threshold();
300 
305  static void set_gds_threshold(std::size_t nbytes);
306 
315  [[nodiscard]] static std::size_t bounce_buffer_size();
316 
322  static void set_bounce_buffer_size(std::size_t nbytes);
323 
333  [[nodiscard]] static std::size_t http_max_attempts();
334 
340  static void set_http_max_attempts(std::size_t attempts);
341 
350  [[nodiscard]] static long http_timeout();
351 
357  static void set_http_timeout(long timeout_seconds);
358 
373  [[nodiscard]] static std::vector<int> const& http_status_codes();
374 
380  static void set_http_status_codes(std::vector<int> status_codes);
381 
389  static bool auto_direct_io_read();
390 
398  static void set_auto_direct_io_read(bool flag);
399 
413 
420  static void set_auto_direct_io_read_overread(bool flag);
421 
429  static bool auto_direct_io_write();
430 
438  static void set_auto_direct_io_write(bool flag);
439 
449 
460  static void set_thread_pool_per_block_device(bool flag);
461 
471  [[nodiscard]] static RemoteIOBackend remote_io_backend();
472 
481  [[nodiscard]] static unsigned int remote_io_num_reactors();
482 
495 
514  [[nodiscard]] static std::size_t remote_io_max_concurrent_requests();
515 };
516 
517 } // namespace kvikio
Singleton class of default values used throughout KvikIO.
Definition: defaults.hpp:122
static CompatMode infer_compat_mode_if_auto(CompatMode compat_mode) noexcept
Infer the AUTO compatibility mode from the system runtime.
static void set_auto_direct_io_read_overread(bool flag)
Enable or disable Direct I/O over-read alignment for device reads.
static void set_thread_pool_per_block_device(bool flag)
Enable or disable per-block-device thread pools.
static std::size_t task_size()
Get the default task size used for parallel IO operations.
static std::vector< int > const & http_status_codes()
The list of HTTP status codes to retry.
static void set_task_size(std::size_t nbytes)
Set the default task size used for parallel IO operations.
static bool auto_direct_io_read()
Check if Direct I/O is enabled for POSIX reads.
static void set_num_threads(unsigned int nthreads)
Alias of set_thread_pool_nthreads
static bool is_compat_mode_preferred()
Whether the global compatibility mode from class defaults is expected to be ON.
static void set_http_status_codes(std::vector< int > status_codes)
Set the list of HTTP status codes to retry.
static RemoteIOBackend remote_io_backend()
The remote I/O backend selected.
static void set_auto_direct_io_read(bool flag)
Enable or disable Direct I/O for POSIX reads.
static bool thread_pool_per_block_device()
Check if per-block-device thread pools are enabled.
static std::size_t remote_io_max_concurrent_requests()
Maximum number of concurrent in-flight requests across all reactor threads under the MULTI_POLL remot...
static void set_compat_mode(CompatMode compat_mode)
Set the value of kvikio::defaults::compat_mode().
static void set_gds_threshold(std::size_t nbytes)
Set the default GDS threshold, which is the minimum size to use GDS (in bytes).
static unsigned int num_threads()
Alias of thread_pool_nthreads
static void set_http_timeout(long timeout_seconds)
Reset the http timeout.
static bool is_compat_mode_preferred(CompatMode compat_mode) noexcept
Given a requested compatibility mode, whether it is expected to reduce to ON.
static bool auto_direct_io_read_overread()
Check if Direct I/O over-read alignment is enabled for device reads.
static void set_thread_pool_nthreads(unsigned int nthreads)
Set the number of threads in the default thread pool. Waits for all currently running tasks to be com...
static std::size_t http_max_attempts()
Get the maximum number of attempts per remote IO read.
static long http_timeout()
The maximum time, in seconds, the transfer is allowed to complete.
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
static void set_auto_direct_io_write(bool flag)
Enable or disable Direct I/O for POSIX writes.
static bool auto_direct_io_write()
Check if Direct I/O is enabled for POSIX writes.
static unsigned int thread_pool_nthreads()
Get the number of threads in the default thread pool.
static std::size_t bounce_buffer_size()
Get the size of the bounce buffer used to stage data in host memory.
static ThreadPool & thread_pool()
Get the default thread pool.
static unsigned int remote_io_num_reactors()
Number of reactor threads used by the MULTI_POLL remote I/O backend.
static void set_http_max_attempts(std::size_t attempts)
Set the maximum number of attempts per remote IO read.
static RemoteReactorDispatch remote_io_reactor_dispatch()
How sub-ranges of one pread() are distributed across reactor threads under the MULTI_POLL remote I/O ...
static void set_bounce_buffer_size(std::size_t nbytes)
Set the size of the bounce buffer used to stage data in host memory.
static CompatMode compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:147
#define KVIKIO_FAIL(...)
Indicates that an erroneous code path has been taken.
Definition: error.hpp:187
KvikIO namespace.
Definition: batch.hpp:16
BS::thread_pool ThreadPool
Thread pool type used for parallel I/O operations.
RemoteReactorDispatch
How sub-ranges of a single pread() are distributed across reactor threads when the MULTI_POLL backend...
CompatMode
I/O compatibility mode.
Definition: compat_mode.hpp:15
RemoteIOBackend
Selects the remote I/O backend.