API

CuFile

class kvikio.cufile.CuFile(file: Path | str, flags: str = 'r')[source]

File handle for GPUDirect Storage (GDS)

__init__(file: Path | str, flags: str = 'r')[source]

Open and register file for GDS IO operations

CuFile opens the file twice and maintains two file descriptors. One file is opened with the specified flags and the other file is opened with the flags plus the O_DIRECT flag.

Parameters:
file: pathlib.Path or str

Path-like object giving the pathname (absolute or relative to the current working directory) of the file to be opened and registered.

flags: str, optional

“r” -> “open for reading (default)” “w” -> “open for writing, truncating the file first” “a” -> “open for writing, appending to the end of file if it exists” “+” -> “open for updating (reading and writing)”

close() None[source]

Deregister the file and close the file

property closed: bool
fileno() int[source]

Get the file descriptor of the open file

open_flags() int[source]

Get the flags of the file descriptor (see open(2))

pread(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) IOFuture[source]

Reads specified bytes from the file into device or host memory in parallel

pread reads the data from a specified file at a specified offset and size bytes into buf. The API works correctly for unaligned offsets and any data size, although the performance might not match the performance of aligned reads. See additional details in the notes below.

pread is non-blocking and returns a IOFuture that can be waited upon. It partitions the operation into tasks of size task_size for execution in the default thread pool.

Parameters:
buf: buffer-like or array-like

Device or host buffer to read into.

size: int, optional

Size in bytes to read.

file_offset: int, optional

Offset in the file to read from.

task_size: int, default=kvikio.defaults.task_size()

Size of each task in bytes.

Returns:
IOFuture

Future that on completion returns the size of bytes that were successfully read.

Notes

KvikIO can only make use of GDS for reads that are aligned to a page boundary. For unaligned reads, KvikIO has to split the reads into aligned and unaligned parts. The GPU page size is 4kB, so all reads should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.

pwrite(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) IOFuture[source]

Writes specified bytes from device or host memory into the file in parallel

pwrite writes the data from buf to the file at a specified offset and size. The API works correctly for unaligned offset and data sizes, although the performance is not on-par with aligned writes. See additional details in the notes below.

pwrite is non-blocking and returns a IOFuture that can be waited upon. It partitions the operation into tasks of size task_size for execution in the default thread pool.

Parameters:
buf: buffer-like or array-like

Device or host buffer to write to.

size: int, optional

Size in bytes to write.

file_offset: int, optional

Offset in the file to write from.

task_size: int, default=kvikio.defaults.task_size()

Size of each task in bytes.

Returns:
IOFuture

Future that on completion returns the size of bytes that were successfully written.

Notes

KvikIO can only make use of GDS for writes that are aligned to a page boundary. For unaligned writes, KvikIO has to split the writes into aligned and unaligned parts. The GPU page size is 4kB, so all writes should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.

read(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) int[source]

Reads specified bytes from the file into the device memory in parallel

This is a blocking version of .pread.

Parameters:
buf: buffer-like or array-like

Device buffer to read into.

size: int, optional

Size in bytes to read.

file_offset: int, optional

Offset in the file to read from.

task_size: int, default=kvikio.defaults.task_size()

Size of each task in bytes.

Returns:
int

The size of bytes that were successfully read.

Notes

KvikIO can only make use of GDS for reads that are aligned to a page boundary. For unaligned reads, KvikIO has to split the reads into aligned and unaligned parts. The GPU page size is 4kB, so all reads should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.

write(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) int[source]

Writes specified bytes from the device memory into the file in parallel

This is a blocking version of .pwrite.

Parameters:
buf: buffer-like or array-like

Device buffer to write to.

size: int, optional

Size in bytes to write.

file_offset: int, optional

Offset in the file to write from.

task_size: int, default=kvikio.defaults.task_size()

Size of each task in bytes.

Returns:
int

The size of bytes that were successfully written.

Notes

KvikIO can only make use of GDS for writes that are aligned to a page boundary. For unaligned writes, KvikIO has to split the writes into aligned and unaligned parts. The GPU page size is 4kB, so all writes should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.

raw_read_async(buf, stream, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) IOFutureStream[source]

Reads specified bytes from the file into the device memory asynchronously

This is an async version of .raw_read that doesn’t use threads and does not support host memory.

Parameters:
buf: buffer-like or array-like

Device buffer to read into.

stream: cuda.Stream

CUDA stream to perform the read operation asynchronously.

size: int, optional

Size in bytes to read.

file_offset: int, optional

Offset in the file to read from.

Returns:
IOFutureStream

Future that when executed “.check_bytes_done()” returns the size of bytes that were successfully read. The instance must be kept alive until all data has been read from disk. One way to do this, is by calling IOFutureStream.check_bytes_done(), which will synchronize the associated stream and return the number of bytes read.

raw_write_async(buf, stream, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) IOFutureStream[source]

Writes specified bytes from the device memory into the file asynchronously

This is an async version of .raw_write that doesn’t use threads and does not support host memory.

Parameters:
buf: buffer-like or array-like

Device buffer to write to.

stream: cuda.Stream

CUDA stream to perform the write operation asynchronously.

size: int, optional

Size in bytes to write.

file_offset: int, optional

Offset in the file to write from.

Returns:
IOFutureStream

Future that when executed “.check_bytes_done()” returns the size of bytes that were successfully written. The instance must be kept alive until all data has been written to disk. One way to do this, is by calling IOFutureStream.check_bytes_done(), which will synchronize the associated stream and return the number of bytes written.

raw_read(buf, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) int[source]

Reads specified bytes from the file into the device memory

This is a low-level version of .read that doesn’t use threads and does not support host memory.

Parameters:
buf: buffer-like or array-like

Device buffer to read into.

size: int, optional

Size in bytes to read.

file_offset: int, optional

Offset in the file to read from.

dev_offset: int, optional

Offset in the buf to read from.

Returns:
int

The size of bytes that were successfully read.

Notes

KvikIO can only make use of GDS for reads that are aligned to a page boundary. For unaligned reads, KvikIO has to split the reads into aligned and unaligned parts. The GPU page size is 4kB, so all reads should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.

raw_write(buf, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) int[source]

Writes specified bytes from the device memory into the file

This is a low-level version of .write that doesn’t use threads and does not support host memory.

Parameters:
buf: buffer-like or array-like

Device buffer to write to.

size: int, optional

Size in bytes to write.

file_offset: int, optional

Offset in the file to write from.

dev_offset: int, optional

Offset in the buf to write from.

Returns:
int

The size of bytes that were successfully written.

Notes

KvikIO can only make use of GDS for writes that are aligned to a page boundary. For unaligned writes, KvikIO has to split the writes into aligned and unaligned parts. The GPU page size is 4kB, so all writes should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.

class kvikio.cufile.IOFuture(handle)[source]

Future for CuFile IO

This class shouldn’t be used directly, instead non-blocking IO operations such as CuFile.pread and CuFile.pwrite returns an instance of this class. Use .get() to wait on the completion of the IO operation and retrieve the result.

__init__(handle)[source]
get() int[source]

Retrieve the result of the IO operation that created this future

This call blocks until the IO operation finishes.

Returns:
int

The size of bytes that were read or written successfully.

done() bool[source]

Return True if the future is done.

Returns:
bool

Whether the future is done or not

Zarr

class kvikio.zarr.GDSStore(path, normalize_keys=False, dimension_separator=None, *, compressor_config_overwrite: Mapping | None = None, decompressor_config_overwrite: Mapping | None = None)[source]

GPUDirect Storage (GDS) class using directories and files.

This class works like zarr.storage.DirectoryStore but implements getitems() in order to support direct reading into device memory. It uses KvikIO for reads and writes, which in turn will use GDS when applicable.

Parameters:
pathstring

Location of directory to use as the root of the storage hierarchy.

normalize_keysbool, optional

If True, all store keys will be normalized to use lower case characters (e.g. ‘foo’ and ‘FOO’ will be treated as equivalent). This can be useful to avoid potential discrepancies between case-sensitive and case-insensitive file system. Default value is False.

dimension_separator{‘.’, ‘/’}, optional

Separator placed between the dimensions of a chunk.

compressor_config_overwrite

If not None, use this Mapping to specify what is written to the Zarr metadata file on disk (.zarray). Normally, Zarr writes the configuration[1] given by the compressor argument to the .zarray file. Use this argument to overwrite the normal configuration and use the specified Mapping instead.

decompressor_config_overwrite

If not None, use this Mapping to specify what compressor configuration[1] is used for decompressing no matter the configuration found in the Zarr metadata on disk (the .zarray file).

[1] https://github.com/zarr-developers/numcodecs/blob/cb155432/numcodecs/abc.py#L79

Notes

Atomic writes are used, which means that data are first written to a temporary file, then moved into place when the write is successfully completed. Files are only held open while they are being read or written and are closed immediately afterwards, so there is no need to manually close any files.

Safe to write in multiple threads or processes.

default_meta_array = array(0.0078125)
__init__(path, normalize_keys=False, dimension_separator=None, *, compressor_config_overwrite: Mapping | None = None, decompressor_config_overwrite: Mapping | None = None) None[source]
getitems(keys: Sequence[str], *, contexts: Mapping[str, Mapping] = {}) Mapping[str, Any][source]

Retrieve data from multiple keys.

Parameters:
keysIterable[str]

The keys to retrieve

contexts: Mapping[str, Context]

A mapping of keys to their context. Each context is a mapping of store specific information. If the “meta_array” key exist, GDSStore use its values as the output array otherwise GDSStore.default_meta_array is used.

Returns:
Mapping

A collection mapping the input keys to their results.

RemoteFile

class kvikio.remote_file.RemoteFile(handle)[source]

File handle of a remote file.

__init__(handle)[source]

Create a remote file from a Cython handle.

This constructor should not be called directly instead use a factory method like RemoteFile.open_http()

Parameters:
handlekvikio._lib.remote_handle.RemoteFile

The Cython handle

classmethod open_http(url: str, nbytes: int | None = None) RemoteFile[source]

Open a http file.

Parameters:
url

URL to the remote file.

nbytes

The size of the file. If None, KvikIO will ask the server for the file size.

classmethod open_s3(bucket_name: str, object_name: str, nbytes: int | None = None) RemoteFile[source]

Open a AWS S3 file from a bucket name and object name.

Please make sure to set the AWS environment variables:
  • AWS_DEFAULT_REGION

  • AWS_ACCESS_KEY_ID

  • AWS_SECRET_ACCESS_KEY

Additionally, to overwrite the AWS endpoint, set AWS_ENDPOINT_URL. See <https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html>

Parameters:
bucket_name

The bucket name of the file.

object_name

The object name of the file.

nbytes

The size of the file. If None, KvikIO will ask the server for the file size.

classmethod open_s3_url(url: str, nbytes: int | None = None) RemoteFile[source]

Open a AWS S3 file from an URL.

The url can take two forms:
Please make sure to set the AWS environment variables:
  • AWS_DEFAULT_REGION

  • AWS_ACCESS_KEY_ID

  • AWS_SECRET_ACCESS_KEY

Additionally, if url is a S3 url, it is possible to overwrite the AWS endpoint by setting AWS_ENDPOINT_URL. See <https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html>

Parameters:
url

Either a http url or a S3 url.

nbytes

The size of the file. If None, KvikIO will ask the server for the file size.

close() None[source]

Close the file

nbytes() int[source]

Get the file size.

Note, this is very fast, no communication needed.

Returns:
The number of bytes.
read(buf, size: int | None = None, file_offset: int = 0) int[source]

Read from remote source into buffer (host or device memory) in parallel.

Parameters:
bufbuffer-like or array-like

Device or host buffer to read into.

size

Size in bytes to read.

file_offset

Offset in the file to read from.

Returns:
The size of bytes that were successfully read.
pread(buf, size: int | None = None, file_offset: int = 0) IOFuture[source]

Read from remote source into buffer (host or device memory) in parallel.

Parameters:
bufbuffer-like or array-like

Device or host buffer to read into.

size

Size in bytes to read.

file_offset

Offset in the file to read from.

Returns:
Future that on completion returns the size of bytes that were successfully
read.

Defaults

kvikio.defaults.compat_mode() CompatMode[source]

Check if KvikIO is running in compatibility mode.

Notice, this is not the same as the compatibility mode in cuFile. That is, cuFile can run in compatibility mode while KvikIO is not.

When KvikIO is running in compatibility mode, it doesn’t load libcufile.so. Instead, reads and writes are done using POSIX.

Set the environment variable KVIKIO_COMPAT_MODE to enable/disable compatibility mode. By default, compatibility mode is enabled:

  • when libcufile cannot be found

  • when running in Windows Subsystem for Linux (WSL)

  • when /run/udev isn’t readable, which typically happens when running inside a docker image not launched with –volume /run/udev:/run/udev:ro

Returns:
bool

Whether KvikIO is running in compatibility mode or not.

kvikio.defaults.compat_mode_reset(compatmode: CompatMode) None[source]

Reset the compatibility mode.

Use this function to enable/disable compatibility mode explicitly.

Parameters:
compatmodekvikio.CompatMode

Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try OFF first, and upon failure, fall back to ON.

kvikio.defaults.get_num_threads() int[source]

Get the number of threads of the thread pool.

Set the default value using num_threads_reset() or by setting the KVIKIO_NTHREADS environment variable. If not set, the default value is 1.

Returns:
nthreads: int

The number of threads in the current thread pool.

kvikio.defaults.num_threads_reset(nthreads: int) None[source]

Reset the number of threads in the default thread pool.

Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.

Parameters:
nthreadsint

The number of threads to use. The default value can be specified by setting the KVIKIO_NTHREADS environment variable. If not set, the default value is 1.