API
CuFile
- class kvikio.cufile.CuFile(file: Path | str, flags: str = 'r')[source]
File handle for GPUDirect Storage (GDS)
- __init__(file: Path | str, flags: str = 'r')[source]
Open and register file for GDS IO operations
CuFile opens the file twice and maintains two file descriptors. One file is opened with the specified flags and the other file is opened with the flags plus the O_DIRECT flag.
- Parameters:
- file: pathlib.Path or str
Path-like object giving the pathname (absolute or relative to the current working directory) of the file to be opened and registered.
- flags: str, optional
“r” -> “open for reading (default)” “w” -> “open for writing, truncating the file first” “a” -> “open for writing, appending to the end of file if it exists” “+” -> “open for updating (reading and writing)”
- property closed: bool
- pread(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) IOFuture [source]
Reads specified bytes from the file into device or host memory in parallel
pread reads the data from a specified file at a specified offset and size bytes into buf. The API works correctly for unaligned offsets and any data size, although the performance might not match the performance of aligned reads. See additional details in the notes below.
pread is non-blocking and returns a IOFuture that can be waited upon. It partitions the operation into tasks of size task_size for execution in the default thread pool.
- Parameters:
- buf: buffer-like or array-like
Device or host buffer to read into.
- size: int, optional
Size in bytes to read.
- file_offset: int, optional
Offset in the file to read from.
- task_size: int, default=kvikio.defaults.task_size()
Size of each task in bytes.
- Returns:
- IOFuture
Future that on completion returns the size of bytes that were successfully read.
Notes
KvikIO can only make use of GDS for reads that are aligned to a page boundary. For unaligned reads, KvikIO has to split the reads into aligned and unaligned parts. The GPU page size is 4kB, so all reads should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.
- pwrite(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) IOFuture [source]
Writes specified bytes from device or host memory into the file in parallel
pwrite writes the data from buf to the file at a specified offset and size. The API works correctly for unaligned offset and data sizes, although the performance is not on-par with aligned writes. See additional details in the notes below.
pwrite is non-blocking and returns a IOFuture that can be waited upon. It partitions the operation into tasks of size task_size for execution in the default thread pool.
- Parameters:
- buf: buffer-like or array-like
Device or host buffer to write to.
- size: int, optional
Size in bytes to write.
- file_offset: int, optional
Offset in the file to write from.
- task_size: int, default=kvikio.defaults.task_size()
Size of each task in bytes.
- Returns:
- IOFuture
Future that on completion returns the size of bytes that were successfully written.
Notes
KvikIO can only make use of GDS for writes that are aligned to a page boundary. For unaligned writes, KvikIO has to split the writes into aligned and unaligned parts. The GPU page size is 4kB, so all writes should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.
- read(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) int [source]
Reads specified bytes from the file into the device memory in parallel
This is a blocking version of .pread.
- Parameters:
- buf: buffer-like or array-like
Device buffer to read into.
- size: int, optional
Size in bytes to read.
- file_offset: int, optional
Offset in the file to read from.
- task_size: int, default=kvikio.defaults.task_size()
Size of each task in bytes.
- Returns:
- int
The size of bytes that were successfully read.
Notes
KvikIO can only make use of GDS for reads that are aligned to a page boundary. For unaligned reads, KvikIO has to split the reads into aligned and unaligned parts. The GPU page size is 4kB, so all reads should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.
- write(buf, size: int | None = None, file_offset: int = 0, task_size: int | None = None) int [source]
Writes specified bytes from the device memory into the file in parallel
This is a blocking version of .pwrite.
- Parameters:
- buf: buffer-like or array-like
Device buffer to write to.
- size: int, optional
Size in bytes to write.
- file_offset: int, optional
Offset in the file to write from.
- task_size: int, default=kvikio.defaults.task_size()
Size of each task in bytes.
- Returns:
- int
The size of bytes that were successfully written.
Notes
KvikIO can only make use of GDS for writes that are aligned to a page boundary. For unaligned writes, KvikIO has to split the writes into aligned and unaligned parts. The GPU page size is 4kB, so all writes should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.
- raw_read_async(buf, stream, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) IOFutureStream [source]
Reads specified bytes from the file into the device memory asynchronously
This is an async version of .raw_read that doesn’t use threads and does not support host memory.
- Parameters:
- buf: buffer-like or array-like
Device buffer to read into.
- stream: cuda.Stream
CUDA stream to perform the read operation asynchronously.
- size: int, optional
Size in bytes to read.
- file_offset: int, optional
Offset in the file to read from.
- Returns:
- IOFutureStream
Future that when executed “.check_bytes_done()” returns the size of bytes that were successfully read. The instance must be kept alive until all data has been read from disk. One way to do this, is by calling IOFutureStream.check_bytes_done(), which will synchronize the associated stream and return the number of bytes read.
- raw_write_async(buf, stream, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) IOFutureStream [source]
Writes specified bytes from the device memory into the file asynchronously
This is an async version of .raw_write that doesn’t use threads and does not support host memory.
- Parameters:
- buf: buffer-like or array-like
Device buffer to write to.
- stream: cuda.Stream
CUDA stream to perform the write operation asynchronously.
- size: int, optional
Size in bytes to write.
- file_offset: int, optional
Offset in the file to write from.
- Returns:
- IOFutureStream
Future that when executed “.check_bytes_done()” returns the size of bytes that were successfully written. The instance must be kept alive until all data has been written to disk. One way to do this, is by calling IOFutureStream.check_bytes_done(), which will synchronize the associated stream and return the number of bytes written.
- raw_read(buf, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) int [source]
Reads specified bytes from the file into the device memory
This is a low-level version of .read that doesn’t use threads and does not support host memory.
- Parameters:
- buf: buffer-like or array-like
Device buffer to read into.
- size: int, optional
Size in bytes to read.
- file_offset: int, optional
Offset in the file to read from.
- dev_offset: int, optional
Offset in the buf to read from.
- Returns:
- int
The size of bytes that were successfully read.
Notes
KvikIO can only make use of GDS for reads that are aligned to a page boundary. For unaligned reads, KvikIO has to split the reads into aligned and unaligned parts. The GPU page size is 4kB, so all reads should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.
- raw_write(buf, size: int | None = None, file_offset: int = 0, dev_offset: int = 0) int [source]
Writes specified bytes from the device memory into the file
This is a low-level version of .write that doesn’t use threads and does not support host memory.
- Parameters:
- buf: buffer-like or array-like
Device buffer to write to.
- size: int, optional
Size in bytes to write.
- file_offset: int, optional
Offset in the file to write from.
- dev_offset: int, optional
Offset in the buf to write from.
- Returns:
- int
The size of bytes that were successfully written.
Notes
KvikIO can only make use of GDS for writes that are aligned to a page boundary. For unaligned writes, KvikIO has to split the writes into aligned and unaligned parts. The GPU page size is 4kB, so all writes should be at an offset that is a multiple of 4096 bytes. If the desired file_offset is not a multiple of 4096, it is likely desirable to round down to the nearest multiple of 4096 and discard any undesired bytes from the resulting data. Similarly, it is optimal for size to be a multiple of 4096 bytes. When GDS isn’t used, this is less critical.
- class kvikio.cufile.IOFuture(handle)[source]
Future for CuFile IO
This class shouldn’t be used directly, instead non-blocking IO operations such as CuFile.pread and CuFile.pwrite returns an instance of this class. Use .get() to wait on the completion of the IO operation and retrieve the result.
CuFile driver
- class kvikio.cufile_driver.ConfigContextManager(config: dict[str, str])[source]
Context manager allowing the cuFile driver configurations to be set upon entering a with block, and automatically reset upon leaving the block.
- kvikio.cufile_driver.set(config: dict[str, Any], /) ConfigContextManager [source]
- kvikio.cufile_driver.set(key: str, value: Any, /) ConfigContextManager
Set cuFile driver configurations.
Examples:
To set one or more properties
# Set the property globally. kvikio.cufile_driver.set({"prop1": value1, "prop2": value2}) # Set the property with a context manager. # The property automatically reverts to its old value # after leaving the `with` block. with kvikio.cufile_driver.set({"prop1": value1, "prop2": value2}): ...
To set a single property
# Set the property globally. kvikio.cufile_driver.set("prop", value) # Set the property with a context manager. # The property automatically reverts to its old value # after leaving the `with` block. with kvikio.cufile_driver.set("prop", value): ...
- Parameters:
- config
The configurations. Can either be a single parameter (dict) consisting of one or more properties, or two parameters key (string) and value (Any) indicating a single property.
Valid configuration names are:
Read-only properties:
"is_gds_available"
"major_version"
"minor_version"
"allow_compat_mode"
"per_buffer_cache_size"
Settable properties:
"poll_mode"
"poll_thresh_size"
"max_device_cache_size"
"max_pinned_memory_size"
- Returns:
- ConfigContextManager
A context manager. If used in a with statement, the configuration will revert to its old value upon leaving the block.
- kvikio.cufile_driver.get(config_name: str) Any [source]
Get cuFile driver configurations.
- Parameters:
- config_name: str
The name of the configuration.
Valid configuration names are:
Read-only properties:
"is_gds_available"
"major_version"
"minor_version"
"allow_compat_mode"
"per_buffer_cache_size"
Settable properties:
"poll_mode"
"poll_thresh_size"
"max_device_cache_size"
"max_pinned_memory_size"
- Returns:
- Any
The value of the configuration.
- kvikio.cufile_driver.libcufile_version() Tuple[int, int] [source]
Get the libcufile version.
Returns (0, 0) for cuFile versions prior to v1.8.
- Returns:
- The version as a tuple (MAJOR, MINOR).
Notes
This is not the version of the CUDA toolkit. cufile is part of the toolkit but follows its own version scheme.
- kvikio.cufile_driver.driver_open() None [source]
Open the cuFile driver
cuFile accepts multiple calls to driver_open(). Only the first call opens the driver, but every call must have a matching call to driver_close().
Normally, it is not required to open and close the cuFile driver since it is done automatically.
- Raises:
- RuntimeError
If cuFile isn’t available.
- kvikio.cufile_driver.driver_close() None [source]
Close the cuFile driver
cuFile accepts multiple calls to driver_open(). Only the first call opens the driver, but every call must have a matching call to driver_close().
- Raises:
- RuntimeError
If cuFile isn’t available.
- kvikio.cufile_driver.initialize() None [source]
Open the cuFile driver and close it again at module exit
Normally, it is not required to open and close the cuFile driver since it is done automatically.
- Raises:
- RuntimeError
If cuFile isn’t available.
Notes
Registers an atexit handler that calls
driver_close()
.
Zarr
- class kvikio.zarr.GDSStore(root: Path | str, *, read_only: bool = False)[source]
- async get(key: str, prototype: BufferPrototype | None = None, byte_range: RangeByteRequest | OffsetByteRequest | SuffixByteRequest | None = None) Buffer | None [source]
Retrieve the value associated with a given key.
- Parameters:
- keystr
- prototypeBufferPrototype
The prototype of the output buffer. Stores may support a default buffer prototype.
- byte_rangeByteRequest, optional
ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.
- Returns:
- Buffer
RemoteFile
- class kvikio.remote_file.RemoteFile(handle)[source]
File handle of a remote file.
- __init__(handle)[source]
Create a remote file from a Cython handle.
This constructor should not be called directly instead use a factory method like RemoteFile.open_http()
- Parameters:
- handlekvikio._lib.remote_handle.RemoteFile
The Cython handle
- classmethod open_http(url: str, nbytes: int | None = None) RemoteFile [source]
Open a http file.
- Parameters:
- url
URL to the remote file.
- nbytes
The size of the file. If None, KvikIO will ask the server for the file size.
- classmethod open_s3(bucket_name: str, object_name: str, nbytes: int | None = None) RemoteFile [source]
Open a AWS S3 file from a bucket name and object name.
- Please make sure to set the AWS environment variables:
AWS_DEFAULT_REGION
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
Additionally, to overwrite the AWS endpoint, set AWS_ENDPOINT_URL. See <https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html>
- Parameters:
- bucket_name
The bucket name of the file.
- object_name
The object name of the file.
- nbytes
The size of the file. If None, KvikIO will ask the server for the file size.
- classmethod open_s3_url(url: str, nbytes: int | None = None) RemoteFile [source]
Open a AWS S3 file from an URL.
- The url can take two forms:
A full http url such as “http://127.0.0.1/my/file”, or
A S3 url such as “s3://<bucket>/<object>”.
- Please make sure to set the AWS environment variables:
AWS_DEFAULT_REGION
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
Additionally, if url is a S3 url, it is possible to overwrite the AWS endpoint by setting AWS_ENDPOINT_URL. See <https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html>
- Parameters:
- url
Either a http url or a S3 url.
- nbytes
The size of the file. If None, KvikIO will ask the server for the file size.
- nbytes() int [source]
Get the file size.
Note, this is very fast, no communication needed.
- Returns:
- The number of bytes.
- read(buf, size: int | None = None, file_offset: int = 0) int [source]
Read from remote source into buffer (host or device memory) in parallel.
- Parameters:
- bufbuffer-like or array-like
Device or host buffer to read into.
- size
Size in bytes to read.
- file_offset
Offset in the file to read from.
- Returns:
- The size of bytes that were successfully read.
- pread(buf, size: int | None = None, file_offset: int = 0) IOFuture [source]
Read from remote source into buffer (host or device memory) in parallel.
- Parameters:
- bufbuffer-like or array-like
Device or host buffer to read into.
- size
Size in bytes to read.
- file_offset
Offset in the file to read from.
- Returns:
- Future that on completion returns the size of bytes that were successfully
- read.
Defaults
- class kvikio.defaults.ConfigContextManager(config: dict[str, str])[source]
Context manager allowing the KvikIO configurations to be set upon entering a with block, and automatically reset upon leaving the block.
- kvikio.defaults.set(config: dict[str, Any], /) ConfigContextManager [source]
- kvikio.defaults.set(key: str, value: Any, /) ConfigContextManager
Set KvikIO configurations.
Examples:
To set one or more properties
# Set the property globally. kvikio.defaults.set({"prop1": value1, "prop2": value2}) # Set the property with a context manager. # The property automatically reverts to its old value # after leaving the `with` block. with kvikio.defaults.set({"prop1": value1, "prop2": value2}): ...
To set a single property
# Set the property globally. kvikio.defaults.set("prop", value) # Set the property with a context manager. # The property automatically reverts to its old value # after leaving the `with` block. with kvikio.defaults.set("prop", value): ...
- Parameters:
- config
The configurations. Can either be a single parameter (dict) consisting of one or more properties, or two parameters key (string) and value (Any) indicating a single property.
Valid configuration names are:
"compat_mode"
"num_threads"
"task_size"
"gds_threshold"
"bounce_buffer_size"
"http_max_attempts"
"http_status_codes"
*http_timeout*
- Returns:
- ConfigContextManager
A context manager. If used in a with statement, the configuration will revert to its old value upon leaving the block.
- kvikio.defaults.get(config_name: str) Any [source]
Get KvikIO configurations.
- Parameters:
- config_name: str
The name of the configuration.
Valid configuration names are:
"compat_mode"
"num_threads"
"task_size"
"gds_threshold"
"bounce_buffer_size"
"http_max_attempts"
"http_status_codes"
- Returns:
- Any
The value of the configuration.
- kvikio.defaults.compat_mode(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Check if KvikIO is running in compatibility mode.
Notice, this is not the same as the compatibility mode in cuFile. That is, cuFile can run in compatibility mode while KvikIO is not.
When KvikIO is running in compatibility mode, it doesn’t load libcufile.so. Instead, reads and writes are done using POSIX.
Set the environment variable KVIKIO_COMPAT_MODE to enable/disable compatibility mode. By default, compatibility mode is enabled:
when libcufile cannot be found
when running in Windows Subsystem for Linux (WSL)
when /run/udev isn’t readable, which typically happens when running inside a docker image not launched with –volume /run/udev:/run/udev:ro
- Returns:
- bool
Whether KvikIO is running in compatibility mode or not.
- kvikio.defaults.num_threads(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Get the number of threads of the thread pool.
Set the default value using set(“num_threads”, value) or by setting the KVIKIO_NTHREADS environment variable. If not set, the default value is 1.
- Returns:
- nthreads: int
The number of threads in the current thread pool.
- kvikio.defaults.task_size(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Get the default task size used for parallel IO operations.
Set the default value using set(“task_size”, value) or by setting the KVIKIO_TASK_SIZE environment variable. If not set, the default value is 4 MiB.
- Returns:
- nbytes: int
The default task size in bytes.
- kvikio.defaults.gds_threshold(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Get the default GDS threshold, which is the minimum size to use GDS.
In order to improve performance of small IO, .pread() and .pwrite() implements a shortcut that circumvent the threadpool and use the POSIX backend directly.
Set the default value using set(“gds_threshold”, value) or by setting the KVIKIO_GDS_THRESHOLD environment variable. If not set, the default value is 1 MiB.
- Returns:
- nbytesint
The default GDS threshold size in bytes.
- kvikio.defaults.bounce_buffer_size(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Get the size of the bounce buffer used to stage data in host memory.
Set the value using set(“bounce_buffer_size”, value) or by setting the KVIKIO_BOUNCE_BUFFER_SIZE environment variable. If not set, the value is 16 MiB.
- Returns:
- nbytesint
The bounce buffer size in bytes.
- kvikio.defaults.http_status_codes(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Get the list of HTTP status codes to retry.
Set the value using
set("http_status_codes", value)
or by setting theKVIKIO_HTTP_STATUS_CODES
environment variable. If not set, the default value is429
500
502
503
504
- Returns:
- status_codeslist[int]
The HTTP status codes to retry.
- kvikio.defaults.http_max_attempts(*args, **kwargs)
Deprecated. Use
kvikio.defaults.get()
instead.Get the maximum number of attempts per remote IO read.
Reads are retried up until
http_max_attempts
when the response has certain HTTP status codes.Set the value using set(“http_max_attempts”, value) or by setting the
KVIKIO_HTTP_MAX_ATTEMPTS
environment variable. If not set, the value is 3.- Returns:
- max_attemptsint
The maximum number of remote IO reads to attempt before raising an error.
- kvikio.defaults.compat_mode_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the compatibility mode.
Use this function to enable/disable compatibility mode explicitly.
- Parameters:
- compatmodekvikio.CompatMode
Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try OFF first, and upon failure, fall back to ON.
- kvikio.defaults.set_compat_mode(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with compat_mode_reset.
- kvikio.defaults.num_threads_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the number of threads in the default thread pool.
Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
- Parameters:
- nthreadsint
The number of threads to use. The default value can be specified by setting the KVIKIO_NTHREADS environment variable. If not set, the default value is 1.
- kvikio.defaults.set_num_threads(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with num_threads_reset.
- kvikio.defaults.task_size_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the default task size used for parallel IO operations.
- Parameters:
- nbytesint
The default task size in bytes.
- kvikio.defaults.set_task_size(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with task_size_reset.
- kvikio.defaults.gds_threshold_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the default GDS threshold, which is the minimum size to use GDS.
- Parameters:
- nbytesint
The default GDS threshold size in bytes.
- kvikio.defaults.set_gds_threshold(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with gds_threshold_reset.
- kvikio.defaults.bounce_buffer_size_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the size of the bounce buffer used to stage data in host memory.
- Parameters:
- nbytesint
The bounce buffer size in bytes.
- kvikio.defaults.set_bounce_buffer_size(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with bounce_buffer_size_reset.
- kvikio.defaults.http_status_codes_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the list of HTTP status codes to retry.
- Parameters:
- status_codeslist[int]
The HTTP status codes to retry.
- kvikio.defaults.set_http_status_codes(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with http_status_codes_reset.
- kvikio.defaults.http_max_attempts_reset(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Reset the maximum number of attempts per remote IO read.
- Parameters:
- attemptsint
The maximum number of attempts to try before raising an error.
- kvikio.defaults.set_http_max_attempts(*args, **kwargs)
Deprecated. Use
kvikio.defaults.set()
instead.Same with http_max_attempts_reset.