# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.
from __future__ import annotations
import enum
import functools
import urllib.parse
from typing import Optional
from kvikio.cufile import IOFuture
[docs]
class RemoteEndpointType(enum.Enum):
"""
Types of remote file endpoints supported by KvikIO.
This enum defines the different protocols and services that can be used
to access remote files. It is used to specify or detect the type of
remote endpoint when opening files.
Attributes
----------
AUTO : int
Automatically detect the endpoint type from the URL. KvikIO will
attempt to infer the appropriate protocol based on the URL format.
S3 : int
AWS S3 endpoint using credentials-based authentication. Requires
AWS environment variables (such as AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
AWS_DEFAULT_REGION) to be set.
S3_PUBLIC : INT
AWS S3 endpoint for publicly accessible objects. No credentials required as the
objects have public read permissions enabled. Used for open datasets and public
buckets.
S3_PRESIGNED_URL : int
AWS S3 endpoint using a presigned URL. No credentials required as
authentication is embedded in the URL with time-limited access.
WEBHDFS : int
Apache Hadoop WebHDFS (Web-based Hadoop Distributed File System)
endpoint for accessing files stored in HDFS over HTTP/HTTPS.
HTTP : int
Generic HTTP or HTTPS endpoint for accessing files from web servers.
This is used for standard web resources that do not fit the other
specific categories.
See Also
--------
RemoteFile.open : Factory method that uses this enum to specify endpoint types.
"""
AUTO = 0
S3 = 1
S3_PUBLIC = 2
S3_PRESIGNED_URL = 3
WEBHDFS = 4
HTTP = 5
@staticmethod
def _map_to_internal(remote_endpoint_type: RemoteEndpointType):
return _get_remote_module().RemoteEndpointType[remote_endpoint_type.name]
@functools.cache
def is_remote_file_available() -> bool:
"""Check if the remote module is available"""
try:
import kvikio._lib.remote_handle # noqa: F401
except ImportError:
return False
else:
return True
@functools.cache
def _get_remote_module():
"""Get the remote module or raise an error"""
if not is_remote_file_available():
raise RuntimeError(
"RemoteFile not available, please build KvikIO "
"with libcurl (-DKvikIO_REMOTE_SUPPORT=ON)"
)
import kvikio._lib.remote_handle
return kvikio._lib.remote_handle
[docs]
class RemoteFile:
"""File handle of a remote file."""
[docs]
def __init__(self, handle):
"""Create a remote file from a Cython handle.
This constructor should not be called directly instead use a
factory method like `RemoteFile.open_http()`
Parameters
----------
handle : kvikio._lib.remote_handle.RemoteFile
The Cython handle
"""
assert isinstance(handle, _get_remote_module().RemoteFile)
self._handle = handle
[docs]
@classmethod
def open_http(
cls,
url: str,
nbytes: Optional[int] = None,
) -> RemoteFile:
"""Open a HTTP/HTTPS file.
Parameters
----------
url
URL to the remote file.
nbytes
The size of the file. If None, KvikIO will ask the server
for the file size.
"""
return cls(_get_remote_module().RemoteFile.open_http(url, nbytes))
[docs]
@classmethod
def open_s3(
cls,
bucket_name: str,
object_name: str,
nbytes: Optional[int] = None,
) -> RemoteFile:
"""Open a AWS S3 file from a bucket name and object name.
Please make sure to set the AWS environment variables:
- `AWS_DEFAULT_REGION`
- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `AWS_SESSION_TOKEN` (when using temporary credentials)
Additionally, to overwrite the AWS endpoint, set `AWS_ENDPOINT_URL`.
See <https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html>
Parameters
----------
bucket_name
The bucket name of the file.
object_name
The object name of the file.
nbytes
The size of the file. If None, KvikIO will ask the server
for the file size.
"""
return cls(
_get_remote_module().RemoteFile.open_s3(bucket_name, object_name, nbytes)
)
[docs]
@classmethod
def open_s3_url(
cls,
url: str,
nbytes: Optional[int] = None,
) -> RemoteFile:
"""Open a AWS S3 file from an URL.
The `url` can take two forms:
- A full http url such as "http://127.0.0.1/my/file", or
- A S3 url such as "s3://<bucket>/<object>".
Please make sure to set the AWS environment variables:
- `AWS_DEFAULT_REGION`
- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `AWS_SESSION_TOKEN` (when using temporary credentials)
Additionally, if `url` is a S3 url, it is possible to overwrite the AWS endpoint
by setting `AWS_ENDPOINT_URL`.
See <https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html>
Parameters
----------
url
Either a http url or a S3 url.
nbytes
The size of the file. If None, KvikIO will ask the server
for the file size.
"""
parsed_result = urllib.parse.urlparse(url.lower())
if parsed_result.scheme in ("http", "https"):
return cls(
_get_remote_module().RemoteFile.open_s3_from_http_url(url, nbytes)
)
if parsed_result.scheme == "s3":
return cls(_get_remote_module().RemoteFile.open_s3_from_s3_url(url, nbytes))
raise ValueError(f"Unsupported protocol: {url}")
[docs]
@classmethod
def open_s3_public(cls, url: str, nbytes: Optional[int] = None) -> RemoteFile:
"""Open a publicly accessible AWS S3 file.
Parameters
----------
url
URL to the remote file.
nbytes
The size of the file. If None, KvikIO will ask the server
for the file size.
"""
return cls(_get_remote_module().RemoteFile.open_s3_public(url, nbytes))
[docs]
@classmethod
def open_s3_presigned_url(
cls,
presigned_url: str,
nbytes: Optional[int] = None,
) -> RemoteFile:
"""Open a AWS S3 file from a presigned URL.
Parameters
----------
presigned_url
Presigned URL to the remote file.
nbytes
The size of the file. If None, KvikIO will ask the server
for the file size.
"""
return cls(
_get_remote_module().RemoteFile.open_s3_presigned_url(presigned_url, nbytes)
)
[docs]
@classmethod
def open_webhdfs(
cls,
url: str,
nbytes: Optional[int] = None,
) -> RemoteFile:
"""Open a file on Apache Hadoop Distributed File System (HDFS) using WebHDFS.
If KvikIO is run within a Docker, the argument ``--network host`` needs to be
passed to the ``docker run`` command.
Parameters
----------
url
URL to the remote file.
nbytes
The size of the file. If None, KvikIO will ask the server for the file
size.
"""
return cls(_get_remote_module().RemoteFile.open_webhdfs(url, nbytes))
[docs]
@classmethod
def open(
cls,
url: str,
remote_endpoint_type: RemoteEndpointType = RemoteEndpointType.AUTO,
allow_list: Optional[list] = None,
nbytes: Optional[int] = None,
) -> RemoteFile:
"""
Create a remote file handle from a URL.
This function creates a RemoteFile for reading data from various remote
endpoints including HTTP/HTTPS servers, AWS S3 buckets, S3 for public access,
S3 presigned URLs, and WebHDFS. The endpoint type can be automatically detected
from the URL or explicitly specified.
Parameters
----------
url : str
The URL of the remote file. Supported formats include:
- S3 with credentials
- S3 for public access
- S3 presigned URL
- WebHDFS
- HTTP/HTTPS
remote_endpoint_type : RemoteEndpointType, optional
The type of remote endpoint. Default is :class:`RemoteEndpointType.AUTO`
which automatically detects the endpoint type from the URL. Can be
explicitly set to :class:`RemoteEndpointType.S3`,
:class:`RemoteEndpointType.S3_PUBLIC`,
:class:`RemoteEndpointType.S3_PRESIGNED_URL`,
:class:`RemoteEndpointType.WEBHDFS`, or :class:`RemoteEndpointType.HTTP`
to force a specific endpoint type.
allow_list : list of RemoteEndpointType, optional
List of allowed endpoint types. If provided:
- If remote_endpoint_type is :class:`RemoteEndpointType.AUTO`, types are
tried in the exact order specified until a match is found.
- In explicit mode, the specified type must be in this list, otherwise an
exception is thrown.
If not provided, defaults to all supported types in this order:
:class:`RemoteEndpointType.S3`,
:class:`RemoteEndpointType.S3_PUBLIC`,
:class:`RemoteEndpointType.S3_PRESIGNED_URL`,
:class:`RemoteEndpointType.WEBHDFS`, and :class:`RemoteEndpointType.HTTP`.
nbytes : int, optional
File size in bytes. If not provided, the function sends an additional
request to the server to query the file size.
Returns
-------
RemoteFile
A RemoteFile object that can be used to read data from the remote file.
Raises
------
RuntimeError
- If the URL is malformed or missing required components.
- :class:`RemoteEndpointType.AUTO` mode is used and the URL does not match
any supported endpoint type.
- The specified endpoint type is not in the `allow_list`.
- The URL is invalid for the specified endpoint type.
- Unable to connect to the remote server or determine file size
(when nbytes not provided).
Examples
--------
- Auto-detect endpoint type from URL:
.. code-block::
handle = RemoteFile.open(
"https://bucket.s3.amazonaws.com/object?X-Amz-Algorithm=AWS4-HMAC-SHA256"
"&X-Amz-Credential=...&X-Amz-Signature=..."
)
- Open S3 file with explicit endpoint type:
.. code-block::
handle = RemoteFile.open(
"https://my-bucket.s3.us-east-1.amazonaws.com/data.bin",
remote_endpoint_type=RemoteEndpointType.S3
)
- Restrict endpoint type candidates:
.. code-block::
handle = RemoteFile.open(
user_provided_url,
remote_endpoint_type=RemoteEndpointType.AUTO,
allow_list=[
RemoteEndpointType.HTTP,
RemoteEndpointType.S3_PRESIGNED_URL
]
)
- Provide known file size to skip HEAD request:
.. code-block::
handle = RemoteFile.open(
"https://example.com/large-file.bin",
remote_endpoint_type=RemoteEndpointType.HTTP,
nbytes=1024 * 1024 * 100 # 100 MB
)
"""
return cls(
_get_remote_module().RemoteFile.open(
url,
RemoteEndpointType._map_to_internal(remote_endpoint_type),
allow_list,
nbytes,
)
)
[docs]
def close(self) -> None:
"""Close the file"""
pass
def __enter__(self) -> RemoteFile:
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
def __str__(self) -> str:
return str(self._handle)
[docs]
def remote_endpoint_type(self) -> RemoteEndpointType:
"""Get the type of the remote file.
Returns
-------
The type of the remote file.
"""
return RemoteEndpointType[self._handle.remote_endpoint_type().name]
[docs]
def nbytes(self) -> int:
"""Get the file size.
Note, this is very fast, no communication needed.
Returns
-------
The number of bytes.
"""
return self._handle.nbytes()
[docs]
def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int:
"""Read from remote source into buffer (host or device memory) in parallel.
Parameters
----------
buf : buffer-like or array-like
Device or host buffer to read into.
size
Size in bytes to read.
file_offset
Offset in the file to read from.
Returns
-------
The size of bytes that were successfully read.
"""
return self.pread(buf, size, file_offset).get()
[docs]
def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture:
"""Read from remote source into buffer (host or device memory) in parallel.
Parameters
----------
buf : buffer-like or array-like
Device or host buffer to read into.
size
Size in bytes to read.
file_offset
Offset in the file to read from.
Returns
-------
Future that on completion returns the size of bytes that were successfully
read.
"""
return IOFuture(self._handle.pread(buf, size, file_offset))