Asynchronous resource management

For applications that are limited by host-device bandwidth it is often necessary to perform copies, device computation and even host computation or networking concurrently. The katsdpsigproc.resource module contains some utilities to simplify management of resources to prevent race conditions when used in conjunction with asyncio. For more detailed background, refer to this PyConZA talk (or read the slides).

Firstly, there is an asyncio-friendly way to wait for events (async_wait_for_events()) and a utility class to bound the amount of in-flight work (JobQueue). The latter addresses a slightly different problem to aiojobs: aiojobs limits the amount of work executing, but does not have a way to limit the pending work. A JobQueue allows the caller to block until the backlog has fallen to a determined level.

A Resource is a lock with extra features. For convenience it holds a reference to some object, so that you don’t need to carry a lock and the locked object around separately. In the simplest case, acquiring and releasing the lock is done like this:

acq = resource.acquire()
with acq as value:   # value is the object passed to Resource() constructor
    await acq.wait_events()
    ...              # Make use of the resource
    acq.ready()      # Releases the lock

That’s a surprising amount of boilerplate for a lock. It’s complicated because it’s flexible and allows for some use cases that aren’t well-supported with a standard lock.

Ordering control

Consider a real-time pipeline that receives chunks of data from the network, processes them, and sends chunks of processed data back into the network. Some parts of the processing can be done concurrently, but some resources are shared and hence require locking. Ideally, we want chunk \(i\) to always get its turn at a shared resource before chunk \(i+1\) (so that chunks don’t complete out-of-order), but standard locks don’t guarantee this. Even locks that guarantee first-in-first-out behavior (such as asyncio.Lock) can’t guarantee this, because chunk \(i\) might have been slow on a previous (parallel) step, and hence only try to acquire the lock after chunk \(i+1\) does.

Resources are always serviced in the order they call Resource.acquire(). This call can be seen like taking a ticket with a number at a bank: it guarantees your position in the queue, but you don’t actually need to be ready. You could go home again to collect the documents you need for your transaction and come back later without losing your place in the queue. Unlike in real life, you won’t be skipped, even if the teller is sitting idle waiting for you. That does mean that your ordering guarantee may come at the price of lower utilization.

In our real-time pipeline example, one can ensure that chunks get their turns by performing all the calls to Resource.acquire() as soon as the chunk is received. It can then work through its processing, actually waiting for the resources only as it needs access to them.

Device events

Astute readers might have noticed that the function to lock the resource is called wait_events() rather than just wait(), and also be wondering why there is an explicit ready() function to unlock rather than letting the context manager handle it. The answer is that there are built-in utilities for synchronization using events (see the section on Synchronization for a description of events).

Let’s say that a resource is a device buffer, and you’re locking it so that you can launch a kernel that writes to it. If you called ready() immediately after enqueuing the kernel, your code would have a bug because kernels execute asynchronously and hence you’re not actually done with the resource yet. You could use one of the synchronization primitives to wait for the kernel to complete, but you may have other work to be getting on with, and it would force all dependencies to be resolved by the CPU. Instead, you can put an event into the command queue, and pass it to ready():

queue.enqueue_kernel(...)   # Or call an Operation
event = queue.enqueue_marker()
acq.ready([event])

This says “I’m done using the resource from Python code, but the next user still has to wait for this event.”

Now the name wait_events() makes more sense: it waits not only for the previous user to call ready(), but also for any events they supplied to complete. That is still a CPU-side wait through; what if we want to use a device-side wait? In that case we can just call ResourceAllocation.wait(), which returns the list of events without waiting for them. It is then the caller’s responsibility, which can be handled using any suitable method such as AbstractCommandQueue.enqueue_wait_for_events().