API#

ucxx

create_listener(callback_func[, port, ...])

Create and start a listener to accept incoming connections

create_endpoint(ip_address, port[, ...])

Create a new endpoint to a server

get_address([ifname, use_ipv6])

Get the address associated with a network interface.

get_config()

Returns all UCX configuration options as a dict.

get_ucp_worker()

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

get_ucx_version()

Return the version of the underlying UCX installation

init([options, env_takes_precedence, ...])

Initiate UCX.

progress()

Try to progress the communication layer

reset()

Resets the UCX library by shutting down all of UCX.

Endpoint

_lib_async.Endpoint(endpoint, ctx[, tags])

An endpoint represents a connection to a peer

_lib_async.Endpoint.abort([period, max_attempts])

Close the communication immediately and abruptly.

_lib_async.Endpoint.close([period, max_attempts])

Close the endpoint cleanly.

_lib_async.Endpoint.closed

Is this endpoint closed?

_lib_async.Endpoint.close_after_n_recv(n[, ...])

Close the endpoint after n received messages.

_lib_async.Endpoint.get_ucp_endpoint()

Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer.

_lib_async.Endpoint.get_ucp_worker()

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

_lib_async.Endpoint.recv(buffer[, tag, ...])

Receive from connected peer into buffer.

_lib_async.Endpoint.send(buffer[, tag, ...])

Send buffer to connected peer.

_lib_async.Endpoint.uid

The unique ID of the underlying UCX endpoint

Listener

_lib_async.Listener(listener, ident, ...)

A handle to the listening service started by create_listener()

_lib_async.Listener.close()

Closing the listener

_lib_async.Listener.closed

Is the listener closed?

_lib_async.Listener.port

The listening network port

ucxx.create_listener(callback_func, port=None, endpoint_error_handling=True, connect_timeout=5.0)#

Create and start a listener to accept incoming connections

callback_func is the function or coroutine that takes one argument – the Endpoint connected to the client.

Notice, the listening is closed when the returned Listener goes out of scope thus remember to keep a reference to the object.

Parameters#

callback_func: function or coroutine

A callback function that gets invoked when an incoming connection is accepted

port: int, optional

An unused port number for listening, or 0 to let UCX assign an unused port.

endpoint_error_handling: boolean, optional

If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.

connect_timeout: float

Timeout in seconds for exchanging peer info. In some cases, exchanging peer information may hang indefinitely, a timeout prevents that. If the chosen value is too high it may cause the operation to be stuck for too long rather than quickly raising a TimeoutError that may be recovered from by the application, but under high-load a higher timeout may be helpful to prevent exchanging peer info from failing too fast.

Returns#

Listener

The new listener. When this object is deleted, the listening stops

async ucxx.create_endpoint(ip_address, port, endpoint_error_handling=True, connect_timeout=5.0)#

Create a new endpoint to a server

Parameters#

ip_address: str

IP address of the server the endpoint should connect to

port: int

IP address of the server the endpoint should connect to

endpoint_error_handling: boolean, optional

If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.

connect_timeout: float

Timeout in seconds for exchanging peer info. In some cases, exchanging peer information may hang indefinitely, a timeout prevents that. If the chosen value is too high it may cause the operation to be stuck for too long rather than quickly raising a TimeoutError that may be recovered from by the application, but under high-load a higher timeout may be helpful to prevent exchanging peer info from failing too fast.

Returns#

Endpoint

The new endpoint

ucxx.get_address(ifname=None, use_ipv6=False)#

Get the address associated with a network interface.

Parameters#

ifnamestr

The network interface name to find the address for. If None, it uses the value of environment variable UCXPY_IFNAME and if UCXPY_IFNAME is not set it defaults to “ib0” An OSError is raised for invalid interfaces.

use_ipv6bool

Whether to get IPv6 addresses instead of the IPv4 default. NOTE: Requires the psutil package.

Raises#

OSError

If no device was found with the specified ifname, or no suitable devices were found when ifname=None.

Returns#

addressstr

The inet addr associated with an interface.

Examples#

>>> get_address()
'10.33.225.160'
>>> get_address(ifname='lo')
'127.0.0.1'
>>> get_address(ifname='lo', use_ipv6=True)
'::1'
ucxx.get_config()#

Returns all UCX configuration options as a dict.

If UCX is uninitialized, the options returned are the options used if UCX were to be initialized now. Notice, this function doesn’t initialize UCX.

Returns#

dict

The current UCX configuration options

ucxx.get_ucp_worker()#

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

ucxx.get_ucx_version()#

Return the version of the underlying UCX installation

Notice, this function doesn’t initialize UCX.

Returns#

tuple

The version as a tuple e.g. (1, 7, 0)

ucxx.init(options={}, env_takes_precedence=False, progress_mode=None, enable_delayed_submission=None, enable_python_future=None, connect_timeout=None)#

Initiate UCX.

Usually this is done automatically at the first API call but this function makes it possible to set UCX options programmable. Alternatively, UCX options can be specified through environment variables.

Parameters#

options: dict, optional

UCX options send to the underlying UCX library

env_takes_precedence: bool, optional

Whether environment variables takes precedence over the options specified here.

progress_mode: string, optional

If None, thread UCX progress mode is used unless the environment variable UCXPY_PROGRESS_MODE is defined. Otherwise the options are ‘blocking’, ‘polling’, ‘thread’.

enable_delayed_submission: boolean, optional

If None, delayed submission is disabled unless UCXPY_ENABLE_DELAYED_SUBMISSION is defined with a value other than 0.

enable_python_future: boolean, optional

If None, request notification via Python futures is disabled unless UCXPY_ENABLE_PYTHON_FUTURE is defined with a value other than 0.

connect_timeout: float, optional

The timeout in seconds for exchanging endpoint information upon endpoint establishment. If None, use the value from UCXPY_CONNECT_TIMEOUT if defined, otherwise fallback to the default of 5 seconds.

ucxx.progress()#

Try to progress the communication layer

Warning, it is illegal to call this from a call-back function such as the call-back function given to create_listener.

ucxx.reset()#

Resets the UCX library by shutting down all of UCX.

The library is initiated at next API call.

Endpoint#

class ucxx._lib_async.Endpoint(endpoint, ctx, tags=None)#

An endpoint represents a connection to a peer

Please use create_listener() and create_endpoint() to create an Endpoint.

abort(period=10000000000, max_attempts=1)#

Close the communication immediately and abruptly. Useful in destructors or generators’ finally blocks.

Despite the attempt to close communication immediately, in some circumstances, notably when the parent worker is running a progress thread, a maximum timeout may be specified for which the close operation will wait. This can be particularly important for cases where the progress thread might be attempting to acquire the GIL while the current thread owns that resource.

Notice, this functions doesn’t signal the connected peer to close. To do that, use Endpoint.close().

Parameters#

period: int

maximum period to wait (in ns) for internal endpoint operations to complete, usually two operations (pre and post) are involved thus the maximum perceived timeout should be multiplied by two.

max_attempts: int

maximum number of attempts to close endpoint, only applicable if worker is running a progress thread and period > 0.

async am_recv()#

Receive from connected peer via active messages.

async am_send(buffer)#

Send buffer to connected peer via active messages.

Parameters#

buffer: exposing the buffer protocol or array/cuda interface

The buffer to send. Raise ValueError if buffer is smaller than nbytes.

async close(period=10000000000, max_attempts=1)#

Close the endpoint cleanly. This will attempt to flush outgoing buffers before actually closing the underlying UCX endpoint.

A maximum timeout and number of attempts may be specified to prevent the underlying Endpoint object from failing to acquire the GIL, see abort() for details.

Parameters#

period: int

maximum period to wait (in ns) for internal endpoint operations to complete, usually two operations (pre and post) are involved thus the maximum perceived timeout should be multiplied by two.

max_attempts: int

maximum number of attempts to close endpoint, only applicable if worker is running a progress thread and period > 0.

close_after_n_recv(n, count_from_ep_creation=False)#

Close the endpoint after n received messages.

Parameters#

n: int

Number of messages to received before closing the endpoint.

count_from_ep_creation: bool, optional

Whether to count n from this function call (default) or from the creation of the endpoint.

property closed#

Is this endpoint closed?

get_ucp_endpoint()#

Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer.

get_ucp_worker()#

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

get_ucxx_endpoint()#

Returns the underlying UCXX endpoint pointer (ucxx::Endpoint*) as a Python integer.

get_ucxx_worker()#

Returns the underlying UCXX worker pointer (ucxx::Worker*) as a Python integer.

async recv(buffer, tag=None, force_tag=False)#

Receive from connected peer into buffer.

Parameters#

buffer: exposing the buffer protocol or array/cuda interface

The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only.

tag: hashable, optional

Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

async recv_multi(tag=None, force_tag=False)#

Receive from connected peer into buffer.

Parameters#

tag: hashable, optional

Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

async recv_obj(tag=None, allocator=<class 'bytearray'>)#

Receive from connected peer that calls send_obj().

As opposed to recv(), this function returns the received object. Data is received into a buffer allocated by allocator.

The transfer includes an extra message containing the size of obj, which increases the overhead slightly.

Parameters#

tag: hashable, optional

Set a tag that must match the received message. Notice, currently UCX-Py doesn’t support a “any tag” thus tag=None only matches a send that also sets tag=None.

allocator: callabale, optional

Function to allocate the received object. The function should take the number of bytes to allocate as input and return a new buffer of that size as output.

Example#

>>> await pickle.loads(ep.recv_obj())
async recv_with_handle(buffer, probe_result)#

Receive tag message using message handle obtained from tag_probe.

This is more efficient than regular recv as it doesn’t need to go through the message matching queue again.

Parameters#

buffer: exposing the buffer protocol or array/cuda interface

The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only.

probe_result: TagProbeResult

The probe result obtained from tag_probe with remove=True.

Returns#

The received data

async send(buffer, tag=None, force_tag=False)#

Send buffer to connected peer.

Parameters#

buffer: exposing the buffer protocol or array/cuda interface

The buffer to send. Raise ValueError if buffer is smaller than nbytes.

tag: hashable, optional

Set a tag that the receiver must match. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

async send_multi(buffers, tag=None, force_tag=False)#

Send buffer to connected peer.

Parameters#

buffer: exposing the buffer protocol or array/cuda interface

The buffer to send. Raise ValueError if buffer is smaller than nbytes.

tag: hashable, optional

Set a tag that the receiver must match. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

async send_obj(obj, tag=None)#

Send obj to connected peer that calls recv_obj().

The transfer includes an extra message containing the size of obj, which increases the overhead slightly.

Parameters#

obj: exposing the buffer protocol or array/cuda interface

The object to send.

tag: hashable, optional

Set a tag that the receiver must match.

Example#

>>> await ep.send_obj(pickle.dumps([1,2,3]))
set_close_callback(callback_func, cb_args=None, cb_kwargs=None)#

Register a user callback function to be called on Endpoint’s closing.

Allows the user to register a callback function to be called when the Endpoint’s error callback is called, or during its finalizer if the error callback is never called.

Once the callback is called, it’s not possible to send any more messages. However, receiving messages may still be possible, as UCP may still have incoming messages in transit.

Parameters#

callback_func: callable

The callback function to be called when the Endpoint’s error callback is called, otherwise called on its finalizer.

cb_args: tuple or None

The arguments to be passed to the callback function as a tuple, or None (default).

cb_kwargs: dict or None

The keyword arguments to be passed to the callback function as a dict, or None (default).

Example >>> ep.set_close_callback(lambda: print(“Executing close callback”))

tag_probe(tag=None, force_tag=False, remove=False)#

Probe for tag messages without receiving them.

This method checks if a message with the specified tag is available without actually receiving it. This is useful for non-blocking message checking.

Parameters#

tag: hashable, optional

Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

remove: bool

If true, remove the message from the queue and return a message handle for efficient reception. If false, leave the message in the queue.

Returns#

TagProbeResult

A result object containing: - matched: bool indicating if a message was found - sender_tag: int sender tag (when matched=True) - length: int message length in bytes (when matched=True) - handle: int message handle for efficient reception (when matched=True and remove=True)

property ucp_endpoint#

The underlying UCP endpoint handle (ucp_ep_h) as a Python integer.

property ucp_worker#

The underlying UCP worker handle (ucp_worker_h) as a Python integer.

property ucxx_endpoint#

The underlying UCXX endpoint pointer (ucxx::Endpoint*) as a Python integer.

property ucxx_worker#

Returns the underlying UCXX worker pointer (ucxx::Worker*) as a Python integer.

property uid#

The unique ID of the underlying UCX endpoint

Listener#

class ucxx._lib_async.Listener(listener, ident, active_clients)#

A handle to the listening service started by create_listener()

The listening continues as long as this object exist or .close() is called. Please use create_listener() to create an Listener.

close()#

Closing the listener

property closed#

Is the listener closed?

property ip#

The listening network IP address

property port#

The listening network port