# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.
import contextlib
from ._lib import libkvikio # type: ignore
[docs]
def compat_mode() -> bool:
"""Check if KvikIO is running in compatibility mode.
Notice, this is not the same as the compatibility mode in cuFile. That is,
cuFile can run in compatibility mode while KvikIO is not.
When KvikIO is running in compatibility mode, it doesn't load `libcufile.so`.
Instead, reads and writes are done using POSIX.
Set the environment variable `KVIKIO_COMPAT_MODE` to enable/disable compatibility
mode. By default, compatibility mode is enabled:
- when `libcufile` cannot be found
- when running in Windows Subsystem for Linux (WSL)
- when `/run/udev` isn't readable, which typically happens when running inside
a docker image not launched with `--volume /run/udev:/run/udev:ro`
Return
------
bool
Whether KvikIO is running in compatibility mode or not.
"""
return libkvikio.compat_mode()
[docs]
def compat_mode_reset(enable: bool) -> None:
"""Reset the compatibility mode.
Use this function to enable/disable compatibility mode explicitly.
Parameters
----------
enable : bool
Set to True to enable and False to disable compatibility mode
"""
libkvikio.compat_mode_reset(enable)
@contextlib.contextmanager
def set_compat_mode(enable: bool):
"""Context for resetting the compatibility mode.
Parameters
----------
enable : bool
Set to True to enable and False to disable compatibility mode
"""
num_threads_reset(get_num_threads()) # Sync all running threads
old_value = compat_mode()
try:
compat_mode_reset(enable)
yield
finally:
compat_mode_reset(old_value)
[docs]
def get_num_threads() -> int:
"""Get the number of threads of the thread pool.
Set the default value using `num_threads_reset()` or by setting the
`KVIKIO_NTHREADS` environment variable. If not set, the default value is 1.
Return
------
nthreads: int
The number of threads in the current thread pool.
"""
return libkvikio.thread_pool_nthreads()
[docs]
def num_threads_reset(nthreads: int) -> None:
"""Reset the number of threads in the default thread pool.
Waits for all currently running tasks to be completed, then destroys all threads
in the pool and creates a new thread pool with the new number of threads. Any
tasks that were waiting in the queue before the pool was reset will then be
executed by the new threads. If the pool was paused before resetting it, the new
pool will be paused as well.
Parameters
----------
nthreads : int
The number of threads to use. The default value can be specified by setting
the `KVIKIO_NTHREADS` environment variable. If not set, the default value
is 1.
"""
libkvikio.thread_pool_nthreads_reset(nthreads)
@contextlib.contextmanager
def set_num_threads(nthreads: int):
"""Context for resetting the number of threads in the default thread pool.
Parameters
----------
nthreads : int
The number of threads to use.
"""
old_value = get_num_threads()
try:
num_threads_reset(nthreads)
yield
finally:
num_threads_reset(old_value)
def task_size() -> int:
"""Get the default task size used for parallel IO operations.
Set the default value using `task_size_reset()` or by setting
the `KVIKIO_TASK_SIZE` environment variable. If not set,
the default value is 4 MiB.
Return
------
nbytes: int
The default task size in bytes.
"""
return libkvikio.task_size()
def task_size_reset(nbytes: int) -> None:
"""Reset the default task size used for parallel IO operations.
Parameters
----------
nbytes : int
The default task size in bytes.
"""
libkvikio.task_size_reset(nbytes)
@contextlib.contextmanager
def set_task_size(nbytes: int):
"""Context for resetting the task size used for parallel IO operations.
Parameters
----------
nbytes : int
The default task size in bytes.
"""
old_value = task_size()
try:
task_size_reset(nbytes)
yield
finally:
task_size_reset(old_value)
def gds_threshold() -> int:
"""Get the default GDS threshold, which is the minimum size to use GDS.
In order to improve performance of small IO, `.pread()` and `.pwrite()`
implements a shortcut that circumvent the threadpool and use the POSIX
backend directly.
Set the default value using `gds_threshold_reset()` or by setting the
`KVIKIO_TASK_SIZE` environment variable. If not set, the default value
is 1 MiB.
Return
------
nbytes : int
The default GDS threshold size in bytes.
"""
return libkvikio.gds_threshold()
def gds_threshold_reset(nbytes: int) -> None:
"""Reset the default GDS threshold, which is the minimum size to use GDS.
Parameters
----------
nbytes : int
The default GDS threshold size in bytes.
"""
libkvikio.gds_threshold_reset(nbytes)
@contextlib.contextmanager
def set_gds_threshold(nbytes: int):
"""Context for resetting the default GDS threshold.
Parameters
----------
nbytes : int
The default GDS threshold size in bytes.
"""
old_value = gds_threshold()
try:
gds_threshold_reset(nbytes)
yield
finally:
gds_threshold_reset(old_value)