API#
ucxx
|
Create and start a listener to accept incoming connections |
|
Create a new endpoint to a server |
|
Get the address associated with a network interface. |
Returns all UCX configuration options as a dict. |
|
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer. |
|
Return the version of the underlying UCX installation |
|
|
Initiate UCX. |
|
Try to progress the communication layer |
|
Resets the UCX library by shutting down all of UCX. |
Endpoint
|
An endpoint represents a connection to a peer |
|
Close the communication immediately and abruptly. |
|
Close the endpoint cleanly. |
Is this endpoint closed? |
|
|
Close the endpoint after n received messages. |
Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer. |
|
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer. |
|
|
Receive from connected peer into buffer. |
|
Send buffer to connected peer. |
The unique ID of the underlying UCX endpoint |
Listener
|
A handle to the listening service started by create_listener() |
Closing the listener |
|
Is the listener closed? |
|
The listening network port |
- ucxx.create_listener(callback_func, port=None, endpoint_error_handling=True, connect_timeout=5.0)#
Create and start a listener to accept incoming connections
callback_func is the function or coroutine that takes one argument – the Endpoint connected to the client.
Notice, the listening is closed when the returned Listener goes out of scope thus remember to keep a reference to the object.
Parameters#
- callback_func: function or coroutine
A callback function that gets invoked when an incoming connection is accepted
- port: int, optional
An unused port number for listening, or 0 to let UCX assign an unused port.
- endpoint_error_handling: boolean, optional
If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.
- connect_timeout: float
Timeout in seconds for exchanging peer info. In some cases, exchanging peer information may hang indefinitely, a timeout prevents that. If the chosen value is too high it may cause the operation to be stuck for too long rather than quickly raising a TimeoutError that may be recovered from by the application, but under high-load a higher timeout may be helpful to prevent exchanging peer info from failing too fast.
Returns#
- Listener
The new listener. When this object is deleted, the listening stops
- async ucxx.create_endpoint(ip_address, port, endpoint_error_handling=True, connect_timeout=5.0)#
Create a new endpoint to a server
Parameters#
- ip_address: str
IP address of the server the endpoint should connect to
- port: int
IP address of the server the endpoint should connect to
- endpoint_error_handling: boolean, optional
If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.
- connect_timeout: float
Timeout in seconds for exchanging peer info. In some cases, exchanging peer information may hang indefinitely, a timeout prevents that. If the chosen value is too high it may cause the operation to be stuck for too long rather than quickly raising a TimeoutError that may be recovered from by the application, but under high-load a higher timeout may be helpful to prevent exchanging peer info from failing too fast.
Returns#
- Endpoint
The new endpoint
- ucxx.get_address(ifname=None, use_ipv6=False)#
Get the address associated with a network interface.
Parameters#
- ifnamestr
The network interface name to find the address for. If None, it uses the value of environment variable UCXPY_IFNAME and if UCXPY_IFNAME is not set it defaults to “ib0” An OSError is raised for invalid interfaces.
- use_ipv6bool
Whether to get IPv6 addresses instead of the IPv4 default. NOTE: Requires the psutil package.
Raises#
- OSError
If no device was found with the specified ifname, or no suitable devices were found when ifname=None.
Returns#
- addressstr
The inet addr associated with an interface.
Examples#
>>> get_address() '10.33.225.160'
>>> get_address(ifname='lo') '127.0.0.1'
>>> get_address(ifname='lo', use_ipv6=True) '::1'
- ucxx.get_config()#
Returns all UCX configuration options as a dict.
If UCX is uninitialized, the options returned are the options used if UCX were to be initialized now. Notice, this function doesn’t initialize UCX.
Returns#
- dict
The current UCX configuration options
- ucxx.get_ucp_worker()#
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.
- ucxx.get_ucx_version()#
Return the version of the underlying UCX installation
Notice, this function doesn’t initialize UCX.
Returns#
- tuple
The version as a tuple e.g. (1, 7, 0)
- ucxx.init(options={}, env_takes_precedence=False, progress_mode=None, enable_delayed_submission=None, enable_python_future=None, connect_timeout=None)#
Initiate UCX.
Usually this is done automatically at the first API call but this function makes it possible to set UCX options programmable. Alternatively, UCX options can be specified through environment variables.
Parameters#
- options: dict, optional
UCX options send to the underlying UCX library
- env_takes_precedence: bool, optional
Whether environment variables takes precedence over the options specified here.
- progress_mode: string, optional
If None, thread UCX progress mode is used unless the environment variable UCXPY_PROGRESS_MODE is defined. Otherwise the options are ‘blocking’, ‘polling’, ‘thread’.
- enable_delayed_submission: boolean, optional
If None, delayed submission is disabled unless UCXPY_ENABLE_DELAYED_SUBMISSION is defined with a value other than 0.
- enable_python_future: boolean, optional
If None, request notification via Python futures is disabled unless UCXPY_ENABLE_PYTHON_FUTURE is defined with a value other than 0.
- connect_timeout: float, optional
The timeout in seconds for exchanging endpoint information upon endpoint establishment. If None, use the value from UCXPY_CONNECT_TIMEOUT if defined, otherwise fallback to the default of 5 seconds.
- ucxx.progress()#
Try to progress the communication layer
Warning, it is illegal to call this from a call-back function such as the call-back function given to create_listener.
- ucxx.reset()#
Resets the UCX library by shutting down all of UCX.
The library is initiated at next API call.
Endpoint#
- class ucxx._lib_async.Endpoint(endpoint, ctx, tags=None)#
An endpoint represents a connection to a peer
Please use create_listener() and create_endpoint() to create an Endpoint.
- abort(period=10000000000, max_attempts=1)#
Close the communication immediately and abruptly. Useful in destructors or generators’
finallyblocks.Despite the attempt to close communication immediately, in some circumstances, notably when the parent worker is running a progress thread, a maximum timeout may be specified for which the close operation will wait. This can be particularly important for cases where the progress thread might be attempting to acquire the GIL while the current thread owns that resource.
Notice, this functions doesn’t signal the connected peer to close. To do that, use Endpoint.close().
Parameters#
- period: int
maximum period to wait (in ns) for internal endpoint operations to complete, usually two operations (pre and post) are involved thus the maximum perceived timeout should be multiplied by two.
- max_attempts: int
maximum number of attempts to close endpoint, only applicable if worker is running a progress thread and period > 0.
- async am_recv()#
Receive from connected peer via active messages.
- async am_send(buffer)#
Send buffer to connected peer via active messages.
Parameters#
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to send. Raise ValueError if buffer is smaller than nbytes.
- async close(period=10000000000, max_attempts=1)#
Close the endpoint cleanly. This will attempt to flush outgoing buffers before actually closing the underlying UCX endpoint.
A maximum timeout and number of attempts may be specified to prevent the underlying Endpoint object from failing to acquire the GIL, see abort() for details.
Parameters#
- period: int
maximum period to wait (in ns) for internal endpoint operations to complete, usually two operations (pre and post) are involved thus the maximum perceived timeout should be multiplied by two.
- max_attempts: int
maximum number of attempts to close endpoint, only applicable if worker is running a progress thread and period > 0.
- close_after_n_recv(n, count_from_ep_creation=False)#
Close the endpoint after n received messages.
Parameters#
- n: int
Number of messages to received before closing the endpoint.
- count_from_ep_creation: bool, optional
Whether to count n from this function call (default) or from the creation of the endpoint.
- property closed#
Is this endpoint closed?
- get_ucp_endpoint()#
Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer.
- get_ucp_worker()#
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.
- get_ucxx_endpoint()#
Returns the underlying UCXX endpoint pointer (ucxx::Endpoint*) as a Python integer.
- get_ucxx_worker()#
Returns the underlying UCXX worker pointer (ucxx::Worker*) as a Python integer.
- async recv(buffer, tag=None, force_tag=False)#
Receive from connected peer into buffer.
Parameters#
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only.
- tag: hashable, optional
Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- async recv_multi(tag=None, force_tag=False)#
Receive from connected peer into buffer.
Parameters#
- tag: hashable, optional
Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- async recv_obj(tag=None, allocator=<class 'bytearray'>)#
Receive from connected peer that calls send_obj().
As opposed to recv(), this function returns the received object. Data is received into a buffer allocated by allocator.
The transfer includes an extra message containing the size of obj, which increases the overhead slightly.
Parameters#
- tag: hashable, optional
Set a tag that must match the received message. Notice, currently UCX-Py doesn’t support a “any tag” thus tag=None only matches a send that also sets tag=None.
- allocator: callabale, optional
Function to allocate the received object. The function should take the number of bytes to allocate as input and return a new buffer of that size as output.
Example#
>>> await pickle.loads(ep.recv_obj())
- async recv_with_handle(buffer, probe_result)#
Receive tag message using message handle obtained from tag_probe.
This is more efficient than regular recv as it doesn’t need to go through the message matching queue again.
Parameters#
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only.
- probe_result: TagProbeResult
The probe result obtained from tag_probe with remove=True.
Returns#
The received data
- async send(buffer, tag=None, force_tag=False)#
Send buffer to connected peer.
Parameters#
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to send. Raise ValueError if buffer is smaller than nbytes.
- tag: hashable, optional
Set a tag that the receiver must match. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- async send_multi(buffers, tag=None, force_tag=False)#
Send buffer to connected peer.
Parameters#
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to send. Raise ValueError if buffer is smaller than nbytes.
- tag: hashable, optional
Set a tag that the receiver must match. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- async send_obj(obj, tag=None)#
Send obj to connected peer that calls recv_obj().
The transfer includes an extra message containing the size of obj, which increases the overhead slightly.
Parameters#
- obj: exposing the buffer protocol or array/cuda interface
The object to send.
- tag: hashable, optional
Set a tag that the receiver must match.
Example#
>>> await ep.send_obj(pickle.dumps([1,2,3]))
- set_close_callback(callback_func, cb_args=None, cb_kwargs=None)#
Register a user callback function to be called on Endpoint’s closing.
Allows the user to register a callback function to be called when the Endpoint’s error callback is called, or during its finalizer if the error callback is never called.
Once the callback is called, it’s not possible to send any more messages. However, receiving messages may still be possible, as UCP may still have incoming messages in transit.
Parameters#
- callback_func: callable
The callback function to be called when the Endpoint’s error callback is called, otherwise called on its finalizer.
- cb_args: tuple or None
The arguments to be passed to the callback function as a tuple, or None (default).
- cb_kwargs: dict or None
The keyword arguments to be passed to the callback function as a dict, or None (default).
Example >>> ep.set_close_callback(lambda: print(“Executing close callback”))
- tag_probe(tag=None, force_tag=False, remove=False)#
Probe for tag messages without receiving them.
This method checks if a message with the specified tag is available without actually receiving it. This is useful for non-blocking message checking.
Parameters#
- tag: hashable, optional
Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- remove: bool
If true, remove the message from the queue and return a message handle for efficient reception. If false, leave the message in the queue.
Returns#
- TagProbeResult
A result object containing: - matched: bool indicating if a message was found - sender_tag: int sender tag (when matched=True) - length: int message length in bytes (when matched=True) - handle: int message handle for efficient reception (when matched=True and remove=True)
- property ucp_endpoint#
The underlying UCP endpoint handle (ucp_ep_h) as a Python integer.
- property ucp_worker#
The underlying UCP worker handle (ucp_worker_h) as a Python integer.
- property ucxx_endpoint#
The underlying UCXX endpoint pointer (ucxx::Endpoint*) as a Python integer.
- property ucxx_worker#
Returns the underlying UCXX worker pointer (ucxx::Worker*) as a Python integer.
- property uid#
The unique ID of the underlying UCX endpoint
Listener#
- class ucxx._lib_async.Listener(listener, ident, active_clients)#
A handle to the listening service started by create_listener()
The listening continues as long as this object exist or .close() is called. Please use create_listener() to create an Listener.
- close()#
Closing the listener
- property closed#
Is the listener closed?
- property ip#
The listening network IP address
- property port#
The listening network port