Source code for katsdpsigproc.accel

################################################################################
# Copyright (c) 2014-2023, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Utilities for interfacing with accelerator hardware.

Both OpenCL and CUDA are supported, as well as multiple devices.

The modules :mod:`.cuda` and :mod:`.opencl` provide the abstraction layer,
but most code will not import these directly (and it might not be possible
to import them). Instead, use :func:`create_some_context` to set up a context
on whatever device is available.

Attributes
----------
have_cuda : bool
    True if PyCUDA could be imported (does not guarantee any CUDA devices)
have_opencl : bool
    True if PyOpenCL could be imported (does not guarantee any OpenCL devices)
"""

import io
import itertools
import os
import re
import sys
from abc import ABC, abstractmethod
from collections import OrderedDict
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generic,
    Iterable,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Sequence,
    Set,
    Tuple,
    TypeVar,
    Union,
    cast,
    overload,
)

import numpy as np

try:
    from numpy.typing import DTypeLike
except ImportError:
    DTypeLike = Any  # type: ignore
import mako.lexer
from mako.lookup import TemplateLookup
from mako.template import Template

if sys.version_info >= (3, 9):
    import importlib.resources as importlib_resources
else:
    import importlib_resources

from .abc import (
    AbstractCommandQueue,
    AbstractContext,
    AbstractDevice,
    AbstractProgram,
    AbstractTuningCommandQueue,
)

try:
    import pycuda.driver

    from . import cuda

    have_cuda = True
except ImportError:
    have_cuda = False
try:
    from . import opencl

    have_opencl = True
except ImportError:
    have_opencl = False

if TYPE_CHECKING:
    import graphviz


_Slice = Union[int, slice, None, Tuple[Union[int, slice, None], ...]]
_B = TypeVar("_B")  # buffer type
_RB = TypeVar("_RB")  # raw buffer type
_RS = TypeVar("_RS")  # raw buffer for SVM
_D = TypeVar("_D", bound="AbstractDevice")
_P = TypeVar("_P", bound="AbstractProgram")
_Q = TypeVar("_Q", bound="AbstractCommandQueue")
_TQ = TypeVar("_TQ", bound="AbstractTuningCommandQueue")


[docs]def divup(x: int, y: int) -> int: """Divide x by y and round the result upwards.""" return (x + y - 1) // y
[docs]def roundup(x: int, y: int) -> int: """Round x up to the next multiple of y.""" return divup(x, y) * y
[docs]class LinenoLexer(mako.lexer.Lexer): """Insert #line directives into the source code. It is used by passing `lexer_cls` to the mako template constructor. """ def __init__(self, *args, **kw) -> None: super().__init__(*args, **kw) self.preprocessor.insert(0, self._lineno_preproc) @classmethod def _escape_filename(cls, filename: str) -> str: """Escape a string for the C preprocessor.""" return '"' + re.sub(r'([\\"])', r"\\\1", filename) + '"' def _lineno_preproc(self, source: str) -> str: """Insert #line directives between every line in `source` and return the result.""" if self.filename is not None: escaped_filename = self._escape_filename(self.filename) else: escaped_filename = self._escape_filename("<string>") lines = source.split("\n") # If the last line is \n-terminated, it will cause an empty # final string in lines if len(lines) > 0 and lines[-1] == "": lines.pop() out = [] for i, line in enumerate(lines): out.append(f"#line {i + 1} {escaped_filename}\n") out.append(line + "\n") return "".join(out)
def _make_lookup(extra_dirs: List[Union[str, os.PathLike]]) -> TemplateLookup: dirs = extra_dirs + [importlib_resources.files("katsdpsigproc")] return TemplateLookup(dirs, lexer_cls=LinenoLexer, strict_undefined=True) # Cached for convenience _lookup = _make_lookup([])
[docs]def build( context: AbstractContext, name: str, render_kws: Optional[Mapping[str, Any]] = None, extra_dirs: Optional[List[Union[str, os.PathLike]]] = None, extra_flags: Optional[List[str]] = None, source: Optional[str] = None, ) -> AbstractProgram: """Build a source module from a mako template. Parameters ---------- context Context for which to compile the code name Source file name, relative to the katsdpsigproc module render_kws Keyword arguments to pass to mako extra_dirs Extra directories to search for source code extra_flags Flags to pass to the compiler source If specified, provides the source, overriding `name` Returns ------- program Compiled module """ if render_kws is None: render_kws = {} if extra_flags is None: extra_flags = [] if extra_dirs is None: lookup = _lookup else: lookup = _make_lookup(extra_dirs) if source is not None: template = Template(source, lookup=lookup, lexer_cls=LinenoLexer) else: template = lookup.get_template(name) rendered_source = template.render(simd_group_size=context.device.simd_group_size, **render_kws) return context.compile(rendered_source, extra_flags)
[docs]def all_devices() -> List[AbstractDevice]: """Return a list of all discovered devices.""" devices: List[AbstractDevice] = [] if have_cuda: pycuda.driver.init() devices.extend(cuda.Device.get_devices()) if have_opencl: devices.extend(opencl.Device.get_devices()) return devices
[docs]def candidate_devices( device_filter: Optional[Callable[[AbstractDevice], bool]] = None, ) -> Sequence[AbstractDevice]: """Get devices that are considered for :func:`create_some_context`. Refer to :func:`create_some_context` for documentation of how this list is affected by `device_filter` and environment variables. If no matching devices are found, returns an empty list. If an environment variable is out of range, raises :exc:`RuntimeError`. """ def parse_id(envar: str) -> Optional[int]: if envar in os.environ: try: num = int(os.environ[envar]) if num >= 0: return num except ValueError: pass return None def parse_id_list(envar: str, max_parts: int) -> Optional[List[int]]: """Split an environment variable of the form a[:b[:c...]] into its components. If the variable is not validly formatted, returns `None`. """ if envar in os.environ: try: fields = os.environ[envar].split(":", max_parts) out = [] for value in fields: num = int(value) if num < 0: raise ValueError("") out.append(num) if not out: raise ValueError("") return out except ValueError: pass return None cuda_id: Optional[int] = None opencl_id: Optional[List[int]] = None device_id: Optional[int] = parse_id("KATSDPSIGPROC_DEVICE") if device_id is None: cuda_id = parse_id("CUDA_DEVICE") if cuda_id is None: # Fields are platform number and device number opencl_id = parse_id_list("PYOPENCL_CTX", 2) cuda_devices: Sequence[cuda.Device] = [] opencl_devices: Sequence[Sequence[opencl.Device]] = [] devices: List[AbstractDevice] = [] if have_cuda: pycuda.driver.init() cuda_devices = cuda.Device.get_devices() if have_opencl: opencl_devices = opencl.Device.get_devices_by_platform() if device_filter is not None: cuda_devices = list(filter(device_filter, cuda_devices)) opencl_devices = [list(filter(device_filter, platform)) for platform in opencl_devices] try: if cuda_id is not None: devices.append(cuda_devices[cuda_id]) elif opencl_id is not None: if len(opencl_id) == 1: devices.extend(opencl_devices[opencl_id[0]]) else: devices.append(opencl_devices[opencl_id[0]][opencl_id[1]]) else: devices = list(itertools.chain(cuda_devices, *opencl_devices)) if device_id is not None: devices = [devices[device_id]] except IndexError: raise RuntimeError("Out-of-range device selected") return devices
[docs]def create_some_context( interactive: bool = True, device_filter: Optional[Callable[[AbstractDevice], bool]] = None, ) -> AbstractContext: """Create a single-device context, selecting a device automatically. This is similar to `pyopencl.create_some_context`. A number of environment variables can be set to limit the choice to a single device: - KATSDPSIGPROC_DEVICE: device number from amongst all devices - CUDA_DEVICE: CUDA device number (compatible with PyCUDA) - PYOPENCL_CTX: OpenCL platform and optionally device number (compatible with PyOpenCL) The first of these that is encountered takes effect. If it does not exist, an exception is thrown. Parameters ---------- interactive If true, and `sys.stdin.isatty()` is true, and there are multiple choices, it will prompt the user. Otherwise, it will choose the first available device, favouring CUDA over OpenCL, then GPU over accelerators over other OpenCL devices. device_filter If specified, each device in turn is passed to it, and it must return True to keep the device as a candidate or False to reject it. Raises ------ RuntimeError If no device could be found or the user made an invalid selection """ def key(device: AbstractDevice) -> int: if device.is_cuda: return 100 elif device.is_gpu: return 50 elif device.is_accelerator: return 40 else: return 30 devices = candidate_devices(device_filter) if not devices: raise RuntimeError("No compute devices found") if interactive and len(devices) > 1 and sys.stdin.isatty(): print("Select device:") for i, device in enumerate(devices): print(f" [{i}]: {device.name} ({device.platform_name})") print() choice_str = input("Enter selection: ") try: choice = int(choice_str) if choice < 0: raise IndexError # Otherwise Python's negative indexing kicks in device = devices[choice] except (ValueError, IndexError): raise RuntimeError("Invalid device number") else: device = max(devices, key=key) return device.make_context()
[docs]class HostArray(np.ndarray): """A restricted array class that can be used to initialise a :class:`DeviceArray`. It uses C ordering and allows padding, which is always in units of the dtype. It optionally uses pinned memory to allow fast transfer to and from device memory. Because of limitations in numpy and PyCUDA (which do not support non-contiguous memory very well), it works by taking a slice from the origin of contiguous storage, and using that contiguous storage in host-device copies. While one can create views, those views cannot be used in fast copies because there is no way to know whether the view is anchored at the origin. See the numpy documentation on subclassing for an explanation of why it is written the way it is. Parameters ---------- shape Shape for the array dtype Data type for the array padded_shape Total size of memory allocation (defaults to `shape`) context If specified, the memory will be allocated in a way that allows efficient copies to and from this context. """ def __new__( cls, shape: Tuple[int, ...], dtype: DTypeLike, padded_shape: Optional[Tuple[int, ...]] = None, context: Optional[AbstractContext] = None, ): if padded_shape is None: padded_shape = shape assert len(padded_shape) == len(shape) assert np.all(np.greater_equal(padded_shape, shape)) if context is not None: owner = context.allocate_pinned(padded_shape, dtype) else: owner = np.empty(padded_shape, dtype) if shape: index = tuple([slice(0, x) for x in shape]) obj = owner[index].view(HostArray) else: obj = owner.view(HostArray) obj._owner = owner obj.padded_shape = padded_shape return obj
[docs] @classmethod def safe(cls, obj: np.ndarray) -> bool: """Determine whether self can be copied to/from a device directly.""" try: return obj._owner is not None # type: ignore except AttributeError: return False
@overload @classmethod def padded_view(cls, obj: "HostArray") -> np.ndarray: ... @overload @classmethod def padded_view(cls, obj: np.ndarray) -> Optional[np.ndarray]: ...
[docs] @classmethod def padded_view(cls, obj: np.ndarray) -> Optional[np.ndarray]: """Retrieve the view of the full memory without padding. Returns `None` if `cls.safe(obj)` is `False`. """ try: return obj._owner # type: ignore except AttributeError: return None
def __array_finalize__(self, obj: "HostArray") -> None: if obj is not None: # View casting or created from a template: we cannot vouch for it. self._owner = None if isinstance(obj, HostArray): self.padded_shape: Tuple[int, ...] = obj.padded_shape else: self.padded_shape = None
[docs]class DeviceArray: """A light-weight array-like wrapper around a device buffer. It that handles padding better than PyCUDA (which has very poor support). It only supports C-order arrays where the inner-most dimension is contiguous. Transfers are designed to use an :class:`HostArray` of the same shape and padding, but fall back to using a copy when necessary. Parameters ---------- context Context in which to allocate the memory shape Shape for the usable data dtype Data type padded_shape Shape for memory allocation (defaults to `shape`) raw If specified, provides the backing memory. It must be created from the `allocate_raw` method of an allocator. """ def __init__( self, context: AbstractContext, shape: Tuple[int, ...], dtype: DTypeLike, padded_shape: Optional[Tuple[int, ...]] = None, raw: Any = None, ) -> None: if padded_shape is None: padded_shape = shape assert len(shape) == len(padded_shape) assert np.all(np.greater_equal(padded_shape, shape)) self._shape = shape self._dtype: np.dtype = np.dtype(dtype) self.padded_shape = padded_shape self.context = context self.buffer = context.allocate(padded_shape, dtype, raw) @property def shape(self) -> Tuple[int, ...]: return self._shape @property def dtype(self) -> np.dtype: return self._dtype @property def ndim(self) -> int: """Return number of dimensions.""" return len(self.shape) @property def strides(self) -> Tuple[int, ...]: """Return strides, as in numpy.""" ans = [self.dtype.itemsize] for i in range(len(self.padded_shape) - 1, 0, -1): ans.append(ans[-1] * self.padded_shape[i]) return tuple(reversed(ans)) def _copyable(self, ary: np.ndarray) -> bool: """Whether `ary` can be copied to/from this array directly.""" return ( HostArray.safe(ary) and ary.dtype == self.dtype and ary.shape == self.shape and ary.padded_shape == self.padded_shape # type: ignore ) @classmethod def _contiguous(cls, ary: HostArray) -> np.ndarray: """Return a contiguous view of a copyable array. The return value is suitable for passing to PyCUDA or PyOpenCL functions (which require a contiguous view). """ return HostArray.padded_view(ary)
[docs] def empty_like(self) -> HostArray: """Return an array-like object that can be efficiently copied.""" return HostArray(self.shape, self.dtype, self.padded_shape, context=self.context)
[docs] def asarray_like(self, ary: np.ndarray) -> HostArray: """Return an array with the same content as `ary`, but the same memory layout as `self`.""" assert ary.shape == self.shape if self._copyable(ary): return ary # type: ignore tmp = self.empty_like() np.copyto(tmp, ary, casting="no") return tmp
[docs] def set(self, command_queue: AbstractCommandQueue, ary: np.ndarray) -> None: """Copy synchronously from `ary` to self.""" ary = self.asarray_like(ary) command_queue.enqueue_write_buffer(self.buffer, self._contiguous(ary))
[docs] def get( self, command_queue: AbstractCommandQueue, ary: Optional[np.ndarray] = None ) -> np.ndarray: """Copy synchronously from self to `ary`. If `ary` is None, or if it is not suitable as a target, the copy is to a newly-allocated :class:`HostArray`. The actual target is returned. """ if ary is None or not self._copyable(ary): ary = self.empty_like() assert isinstance(ary, HostArray) command_queue.enqueue_read_buffer(self.buffer, self._contiguous(ary)) return ary
[docs] def set_async(self, command_queue: AbstractCommandQueue, ary: np.ndarray) -> None: """Copy asynchronously from `ary` to self.""" ary = self.asarray_like(ary) command_queue.enqueue_write_buffer(self.buffer, self._contiguous(ary), blocking=False)
[docs] def get_async( self, command_queue: AbstractCommandQueue, ary: Optional[np.ndarray] = None ) -> np.ndarray: """Copy asynchronously from self to `ary` (see :meth:`get`).""" if ary is None or not self._copyable(ary): ary = self.empty_like() assert isinstance(ary, HostArray) command_queue.enqueue_read_buffer(self.buffer, self._contiguous(ary), blocking=False) return ary
@classmethod def _canonical_slice( cls, region: _Slice, shape: Tuple[int, ...], strides: Tuple[int, ...] ) -> Tuple[int, Tuple[int, ...], Tuple[int, ...]]: """Transform a slice selection into a form that is easier to consume internally. See :meth:`copy_region` for a description of what slice expressions are supported. Parameters ---------- region : int, slice, `np.newaxis` or tuple of same Index expression generated by `np.s_` or `np.index_exp` shape : sequence Shape of the array being indexed strides : sequence Strides of the array being indexed Returns ------- origin : int Byte offset for the first selected element shape : tuple of int Shape of the region to copy strides : tuple of int Byte stride between elements along each axis of `shape`. Each stride is a multiple of the following ones, except that a dimension of length 1 may have a stride of 0. """ origin = 0 copy_shape: List[int] = [] copy_strides = [] axis = 0 if not isinstance(region, tuple): region = (region,) for index in region: if index is np.newaxis: copy_shape.append(1) copy_strides.append(0) elif isinstance(index, slice): if axis >= len(shape): raise IndexError("Too many axes in index expression") start, stop, stride = index.indices(shape[axis]) if stride <= 0: raise IndexError("Only positive strides are supported") origin += start * strides[axis] copy_shape.append((stop - start) // stride) if copy_shape[-1] <= 0: raise IndexError("Empty slice selection") copy_strides.append(stride * strides[axis]) axis += 1 elif isinstance(index, int): if axis >= len(shape): raise IndexError("Too many axes in index expression") if index < 0: index += shape[axis] if index < 0 or index >= shape[axis]: raise IndexError("Index out of range") origin += index * strides[axis] axis += 1 else: raise TypeError(f"Invalid type in slice: {type(index)}") while axis < len(shape): copy_shape.append(shape[axis]) copy_strides.append(strides[axis]) axis += 1 return origin, tuple(copy_shape), tuple(copy_strides) @classmethod def _region_transfer_params( cls, src: Any, dest: Any, src_region: _Slice, dest_region: _Slice ) -> Tuple[int, int, Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]: """Compute offsets, strides and shape for a copy. Where possible, the dimension of the copy is reduced by exploiting contiguity. When interpreting the return values, the arrays should be thought of as byte arrays, ignoring any shape, padding or dtype of the originals. Parameters ---------- src,dest : array-like Source and destination buffers, with `dtype, `shape` and `strides` attributes src_region,dest_region : int, slice, `np.newaxis` or tuple of same Index expression generated by `np.s_` or `np.index_exp` Returns ------- src_origin,dest_origin : int Byte offset into the source/destination array for the start of the copy shape : tuple Shape for the copy, which may have fewer dimensions than the arrays. src_strides,dest_strides : tuple Byte strides for source/destination. Raises ------ TypeError if the source and destination do not have the same dtype ValueError if the source and destination regions are not the same shape IndexError if the source or destination regions are unsupported or out-of-range """ if src.dtype != dest.dtype: raise TypeError(f"dtypes do not match ({src.dtype} and {dest.dtype})") src_origin, src_shape, src_strides = cls._canonical_slice( src_region, src.shape, src.strides ) dest_origin, dest_shape, dest_strides = cls._canonical_slice( dest_region, dest.shape, dest.strides ) if src_shape != dest_shape: raise ValueError("Source and destination shapes for the copy do not match") # Search for axes that can be collapsed together new_shape = [src.dtype.itemsize] new_src_strides = [1] new_dest_strides = [1] for axis in range(len(src_shape) - 1, -1, -1): if src_shape[axis] == 1: continue # Can just ignore this axis if ( src_strides[axis] == new_shape[-1] * new_src_strides[-1] and dest_strides[axis] == new_shape[-1] * new_dest_strides[-1] ): new_shape[-1] *= src_shape[axis] else: new_shape.append(src_shape[axis]) new_src_strides.append(src_strides[axis]) new_dest_strides.append(dest_strides[axis]) return ( src_origin, dest_origin, tuple(new_shape), tuple(new_src_strides), tuple(new_dest_strides), ) @classmethod def _transfer_region( cls, func: Callable, buffer1: Any, buffer2: Any, origin1: int, origin2: int, shape: Tuple[int, ...], strides1: Tuple[int, ...], strides2: Tuple[int, ...], **kwargs, ) -> None: """Wrap the command-queue's copy and handle high dimensions by looping.""" if len(shape) > 3: for i in range(shape[-1]): cls._transfer_region( func, buffer1, buffer2, origin1 + strides1[-1] * i, origin2 + strides2[-1] * i, shape[:-1], strides1[:-1], strides2[:-1], **kwargs, ) else: func(buffer1, buffer2, origin1, origin2, shape, strides1, strides2, **kwargs)
[docs] def copy_region( self, command_queue: AbstractCommandQueue, dest: "DeviceArray", src_region: _Slice, dest_region: _Slice, ) -> None: """Perform a device-to-device copy of a subregion of `self` to `dest`. If the source and destination memory overlap, the result is undefined. The regions to copy are specified using a subset of numpy array indexing syntax. The following are supported: - slices with positive strides - integers - :data:`np.newaxis <numpy.newaxis>` - If fewer indices than axes are specified, all elements on the remaining axes are used. Ellipses are not yet supported, but it would be straightforward to add support. Parameters ---------- command_queue Command queue for the asynchronous operation. dest Target of the copy src_region,dest_region Index expressions constructed by :data:`np.s_ <numpy.s_>` or :data:`!np.index_exp`. Raises ------ TypeError if the source and destination do not have the same dtype ValueError if the source and destination regions select different shapes IndexError if the source or destination regions are unsupported or out-of-range """ src_origin, dest_origin, shape, src_strides, dest_strides = self._region_transfer_params( self, dest, src_region, dest_region ) self._transfer_region( command_queue.enqueue_copy_buffer_rect, self.buffer, dest.buffer, src_origin, dest_origin, shape, src_strides, dest_strides, )
[docs] def get_region( self, command_queue: AbstractCommandQueue, ary: np.ndarray, device_region: _Slice, ary_region: _Slice, blocking: bool = True, ) -> None: """Perform a device-to-host copy of a subregion of `self` to `ary`. See :meth:`~DeviceArray.copy_region` for a description of how regions are specified. Parameters ---------- command_queue Command queue for the operation. ary Target of the copy device_region,ary_region Index expressions constructed by :data:`np.s_ <numpy.s_>` or :data:`!np.index_exp`, to specify the source and target regions. blocking If false, the operation will be asynchronous. Raises ------ TypeError if the source and destination do not have the same dtype ValueError if the source and destination regions select different shapes ValueError if `ary` is not a :class:`HostArray` or is a view of one IndexError if the source or destination regions are unsupported or out-of-range """ if not HostArray.safe(ary): raise ValueError("Target region is not suitable for device-to-host copy") assert isinstance(ary, HostArray) device_origin, ary_origin, shape, device_strides, ary_strides = ( self._region_transfer_params(self, ary, device_region, ary_region) ) self._transfer_region( command_queue.enqueue_read_buffer_rect, self.buffer, self._contiguous(ary), device_origin, ary_origin, shape, device_strides, ary_strides, blocking=blocking, )
[docs] def set_region( self, command_queue: AbstractCommandQueue, ary: np.ndarray, device_region: _Slice, ary_region: _Slice, blocking: bool = True, ) -> None: """Perform a host-to-device copy of a subregion `ary` to `self`. See :meth:`~DeviceArray.copy_region` for a description of how regions are specified. Parameters ---------- command_queue Command queue for the operation. ary Source of the copy device_region,ary_region Index expressions constructed by :data:`np.s_ <numpy.s_>` or :data:`!np.index_exp`, to specify the source and target regions. blocking If false, the operation will be asynchronous. Raises ------ TypeError if the source and destination do not have the same dtype ValueError if the source and destination regions select different shapes IndexError if the source or destination regions are unsupported or out-of-range """ if not HostArray.safe(ary): tmp = HostArray(ary[ary_region].shape, ary.dtype, context=self.context) np.copyto(tmp, ary[ary_region], casting="no") ary = tmp ary_region = np.s_[()] assert isinstance(ary, HostArray) ary_origin, device_origin, shape, ary_strides, device_strides = ( self._region_transfer_params(ary, self, ary_region, device_region) ) self._transfer_region( command_queue.enqueue_write_buffer_rect, self.buffer, self._contiguous(ary), device_origin, ary_origin, shape, device_strides, ary_strides, blocking=blocking, )
[docs] def zero(self, command_queue: AbstractCommandQueue) -> None: """Memset with zeros (asynchronously).""" command_queue.enqueue_zero_buffer(self.buffer)
[docs]class SVMArray(HostArray, DeviceArray): """An array that uses shared virtual memory to be accessible from both the host and the device. Shared virtual memory is also known as managed memory (in CUDA). It should not be used as a source or target for copies to the device, since it already resides on the device. Due to limitations in PyOpenCL, this is currently only available for CUDA, and CUDA's restrictions on managed memory apply. Only the base array (not views) can be passed to kernels. Parameters ---------- context Context in which to allocate the memory shape Shape for the array dtype Data type for the array padded_shape Total size of memory allocation (defaults to `shape`) raw : PyCUDA DeviceAllocation or PyOpenCL Buffer, optional If specified, provides the backing memory """ def __new__( cls, context: AbstractContext, shape: Tuple[int, ...], dtype: DTypeLike, padded_shape: Optional[Tuple[int, ...]] = None, raw: Any = None, ) -> "SVMArray": if padded_shape is None: padded_shape = shape assert len(padded_shape) == len(shape) assert np.all(np.greater_equal(padded_shape, shape)) owner = context.allocate_svm(padded_shape, dtype, raw=raw) if shape: index = tuple([slice(0, x) for x in shape]) obj = owner[index].view(SVMArray) else: obj = owner.view(SVMArray) obj._owner = owner obj.padded_shape = padded_shape obj.context = context return obj def __init__(self, *args, **kwargs): # Suppress DeviceArray's constructor, since we did everything in __new__ pass @property def buffer(self) -> np.ndarray: return self._owner # type: ignore def _copyable(self, ary: np.ndarray) -> bool: """Whether `ary` can be copied to/from this array directly.""" return ary.dtype == self.dtype and ary.shape == self.shape
[docs] def set(self, command_queue: AbstractCommandQueue, ary: np.ndarray) -> None: """Copy synchronously from `ary` to self. For SVMArray, this is a CPU copy. """ # Ensure that it synchronises with previous work in the queue command_queue.finish() self[...] = ary
[docs] def get( self, command_queue: AbstractCommandQueue, ary: Optional[np.ndarray] = None ) -> np.ndarray: """Copy synchronously copy from self to `ary`. If `ary` is None, or if it is not suitable as a target, the copy is to a newly allocated :class:`HostArray`. The actual target is returned. For SVMArray, this is a CPU copy. """ # Ensure that it synchronises with previous work in the queue command_queue.finish() if ary is None or not self._copyable(ary): return self.copy() else: ary[:] = self return ary
[docs] def set_async(self, command_queue: AbstractCommandQueue, ary: np.ndarray) -> None: """Copy asynchronously from `ary` to self. This is implemented synchronously for SVMArray, but exists for compatibility. """ self.set(command_queue, ary)
[docs] def get_async( self, command_queue: AbstractCommandQueue, ary: Optional[np.ndarray] = None ) -> np.ndarray: """Copy asynchronously from self to `ary` (see `get`). This is implemented synchronously for SVMArray, but exists for compatibility. """ return self.get(command_queue, ary)
[docs] def set_region( self, command_queue: AbstractCommandQueue, ary: np.ndarray, device_region: _Slice, ary_region: _Slice, blocking: bool = True, ) -> None: # Ensure that it synchronises with previous work in the queue command_queue.finish() self[device_region] = ary[ary_region]
[docs] def get_region( self, command_queue: AbstractCommandQueue, ary: np.ndarray, device_region: _Slice, ary_region: _Slice, blocking: bool = True, ) -> None: # Ensure that it synchronises with previous work in the queue command_queue.finish() ary[ary_region] = self[device_region]
[docs]class AbstractAllocator(ABC, Generic[_RB]): """Interface for allocating device memory.""" context: AbstractContext
[docs] @abstractmethod def allocate( self, shape: Tuple[int, ...], dtype: DTypeLike, padded_shape: Optional[Tuple[int, ...]] = None, raw: Optional[_RB] = None, ) -> DeviceArray: pass
[docs] @abstractmethod def allocate_raw(self, n_bytes: int) -> _RB: pass
[docs]class DeviceAllocator(Generic[_B, _RB, _RS, _D, _P, _Q, _TQ], AbstractAllocator[_RB]): """Allocate :class:`DeviceArray` objects from a context.""" def __init__(self, context: AbstractContext[_B, _RB, _RS, _D, _P, _Q, _TQ]) -> None: self.context = context
[docs] def allocate( self, shape: Tuple[int, ...], dtype: DTypeLike, padded_shape: Optional[Tuple[int, ...]] = None, raw: Optional[_RB] = None, ) -> DeviceArray: return DeviceArray(self.context, shape, dtype, padded_shape, raw)
[docs] def allocate_raw(self, n_bytes: int) -> _RB: return self.context.allocate_raw(n_bytes)
[docs]class SVMAllocator(Generic[_B, _RB, _RS, _D, _P, _Q, _TQ], AbstractAllocator[_RS]): """Allocate :class:`SVMArray` objects from a context.""" def __init__(self, context: AbstractContext[_B, _RB, _RS, _D, _P, _Q, _TQ]) -> None: self.context = context
[docs] def allocate( self, shape: Tuple[int, ...], dtype: DTypeLike, padded_shape: Optional[Tuple[int, ...]] = None, raw: Optional[_RS] = None, ) -> SVMArray: return SVMArray(self.context, shape, dtype, padded_shape, raw)
[docs] def allocate_raw(self, n_bytes: int) -> _RS: return self.context.allocate_svm_raw(n_bytes)
[docs]class Dimension: """A single dimension of an :class:`IOSlot`. It represents padding and alignment requirements. Instances can be linked together, with all linked instances (whether directly or indirectly linked) exposing the intersection of their requirements. Internally this is represented using a union-find tree with path compression. Note that `min_padded_round` and `alignment` provide similar but different requirements. The former is used to determine a minimum amount of padding, but any larger amount is acceptable, even if not a multiple of `min_padded_round`. The latter is stricter, always requiring a multiple of this value. The latter is not expected to be frequently used except when type-punning. Dimensions can also be frozen, which is done by :class:`IOSlot` once a buffer is bound. This prevents the requirements from changing later (via linking or otherwise) and hence invalidating the bound buffer, or allowing two buffers with a shared dimension but different strides to be used. Parameters ---------- size Size of the actual data, before padding min_padded_round The `min_padded_size` will be computed by rounding `size` up to a multiple of this. min_padded_size Minimum size of the padded allocation, overriding `min_padded_round`. alignment Padded size is required to be a multiple of this. This must be a power of 2. align_dtype If specified, it is a hint that this data type is the fastest-varying axis of a multidimensional array. The padded size may be chosen to be such that the stride is a multiple of `ALIGN_BYTES`, to ensure efficient access by GPUs. The hint will be ignored if `exact` is true. exact If true, padding is forbidden """ ALIGN_BYTES = 128 @classmethod def _is_power2(cls, value: int) -> bool: return value > 0 and (value & (value - 1)) == 0 def __init__( self, size: int, min_padded_round: Optional[int] = None, min_padded_size: Optional[int] = None, alignment: int = 1, align_dtype: Optional[DTypeLike] = None, exact: bool = False, ) -> None: if min_padded_size is None: if min_padded_round is not None: min_padded_size = roundup(size, min_padded_round) else: min_padded_size = size if not self._is_power2(alignment): raise ValueError("alignment is not a power of 2") if min_padded_size < size: raise ValueError("padded size is less than size") self._parent: Optional[Dimension] = None self._size = size self._min_padded_size = min_padded_size self._alignment = alignment self._alignment_hint = alignment self._exact = exact self._frozen = False if align_dtype is not None: self.add_align_dtype(align_dtype) def _root(self) -> "Dimension": if self._parent is None: return self else: self._parent = self._parent._root() return self._parent @property def size(self) -> int: return self._root()._size @property def min_padded_size(self) -> int: return self._root()._min_padded_size @property def alignment(self) -> int: return self._root()._alignment @property def alignment_hint(self) -> int: return self._root()._alignment_hint @property def exact(self) -> bool: return self._root()._exact @property def frozen(self) -> bool: return self._root()._frozen
[docs] def required_padded_size(self) -> int: """Return padded size required to satisfy this dimension.""" root = self._root() size = roundup(root._min_padded_size, root._alignment) # If the size is less than the alignment hint, it could # waste a huge amount of memory to pad it if not root._exact and size >= root._alignment_hint: size = roundup(size, root._alignment_hint) return size
[docs] def valid(self, padded_size: int) -> bool: """Return whether `size` is a valid padded size.""" root = self._root() if root._exact: return padded_size == root.required_padded_size() else: return padded_size >= root._min_padded_size and padded_size % root._alignment == 0
[docs] def add_align_dtype(self, dtype: DTypeLike) -> None: """Add an alignment hint. Indicates that this will be used with an array whose fastest-varying dimension is of type `dtype`. If the size is not a power of 2, it is ignored. """ if self.frozen: raise ValueError("cannot modify a frozen requirement") itemsize = np.dtype(dtype).itemsize if self._is_power2(itemsize): root = self._root() root._alignment_hint = max(root._alignment_hint, self.ALIGN_BYTES // itemsize)
[docs] def freeze(self) -> None: """Prevent further modifications.""" self._root()._frozen = True
[docs]class IOSlotBase(ABC): """An input/output slot of an operation. A slot can be bound to storage, or can allocate storage itself. This base class is untyped and unshaped, so in most cases one will use :class:`IOSlot` instead. Slots are arranged in a tree, and only the root can be manipulated directly. The entire tree shares the same storage. A slot cannot be reattached to a new parent. """ def __init__(self) -> None: self.is_root = True
[docs] def check_root(self) -> None: """Check whether this slot is a root slot, and raise an exception if not.""" if not self.is_root: raise ValueError("not a root slot")
[docs] @abstractmethod def required_bytes(self) -> int: """Return number of bytes of device storage required."""
[docs] @abstractmethod def is_bound(self): """Return whether storage is currently attached to this slot."""
[docs] def attachable(self) -> bool: """Return whether this slot can be attached as a child to another.""" return self.is_root and not self.is_bound()
@abstractmethod def _allocate( self, allocator: AbstractAllocator[_RB], raw: Optional[_RB] = None, *, bind: bool, ) -> Any: """Variant of :meth:`allocate` that does not check whether this is a root slot."""
[docs] def allocate( self, allocator: AbstractAllocator[_RB], raw: Optional[_RB] = None, *, bind: bool = True, ) -> Any: """Allocate and optionally bind a buffer satisfying the requirements. .. warning:: When `raw` is provided, there is no check that the storage is large enough. Parameters ---------- allocator Memory allocator from which to obtain the memory raw : PyCUDA DeviceAllocation or PyOpenCL Buffer, optional Backing storage for the allocation bind If true (the default), the allocated buffer is immediately bound. If false, the buffer is returned and not bound, and can be bound later. Raises ------ ValueError If this is not a root slot """ self.check_root() return self._allocate(allocator, raw, bind=bind)
@abstractmethod def _allocate_host(self, context: AbstractContext) -> HostArray: """Variant of :meth:`allocate_host` that does not check whether this is a root slot."""
[docs] def allocate_host(self, context: AbstractContext) -> HostArray: """Allocate a HostArray compatible with this slot.""" self.check_root() return self._allocate_host(context)
[docs]class IOSlot(IOSlotBase): """An input/output slot with type and shape information. It contains a reference to a buffer (initially `None`) and the shape, type, padding and alignment requirements for that buffer. It can allocate the buffer on request. Each dimension is represesented by a :class:`Dimension`. Slots are arranged in a tree, and only the root can be manipulated directly. A slot cannot be reattached to a new parent. Parameters ---------- dimensions : tuple of int or :class:`Dimension` The dimensions of the slots. Integers are converted to :class:`Dimension` with no further requirements. dtype : numpy dtype Type of the data elements """ @classmethod def _make_dimension(cls, dimension: Union[Dimension, int]) -> Dimension: if isinstance(dimension, Dimension): return dimension else: return Dimension(dimension) def __init__(self, dimensions: Tuple[Union[Dimension, int], ...], dtype: DTypeLike) -> None: super().__init__() self.dimensions = tuple([self._make_dimension(x) for x in dimensions]) self.shape = tuple([x.size for x in self.dimensions]) if len(self.dimensions) > 1: self.dimensions[-1].add_align_dtype(dtype) self.dtype: np.dtype = np.dtype(dtype) self.buffer: Optional[DeviceArray] = None
[docs] def is_bound(self) -> bool: return self.buffer is not None
[docs] def validate(self, buffer: DeviceArray) -> None: """Check that `buffer` is suitable for binding. Parameters ---------- buffer Buffer to validate (must not be `None`) Raises ------ TypeError If the data type does not match ValueError If the dimensions or shape do not match ValueError If a padded size does not match :meth:`Dimension.required_padded_size` """ if buffer.dtype != self.dtype: raise TypeError("dtype does not match") if len(buffer.shape) != len(self.shape): raise ValueError("number of dimensions does not match") for size, padded_size, dimension in zip(buffer.shape, buffer.padded_shape, self.dimensions): if size != dimension.size: raise ValueError("size does not match") if padded_size != dimension.required_padded_size(): raise ValueError("padded size does not match")
def _bind(self, buffer: Optional[DeviceArray]) -> None: """Variant of :meth:`bind` that does not check whether this is a root slot.""" if buffer is not None: self.validate(buffer) self.buffer = buffer for dimension in self.dimensions: dimension.freeze()
[docs] def bind(self, buffer: Optional[DeviceArray]) -> None: """Set the internal buffer reference. Always use this function rather than writing it directly. If the buffer is not `None`, it is validated (see :meth:`validate`). Parameters ---------- buffer Buffer to store Raises ------ ValueError If this is not a root slot """ self.check_root() self._bind(buffer)
[docs] def required_padded_shape(self) -> Tuple[int, ...]: """Return padded shape required to satisfy only this slot.""" return tuple(x.required_padded_size() for x in self.dimensions)
[docs] def required_bytes(self) -> int: return int(np.prod(self.required_padded_shape()) * self.dtype.itemsize)
def _allocate( self, allocator: AbstractAllocator[_RB], raw: Optional[_RB] = None, *, bind: bool = True, ) -> DeviceArray: buffer = allocator.allocate(self.shape, self.dtype, self.required_padded_shape(), raw=raw) if bind: self._bind(buffer) return buffer def _allocate_host(self, context: AbstractContext) -> HostArray: return HostArray(self.shape, self.dtype, self.required_padded_shape(), context=context)
[docs] def allocate( self, allocator: AbstractAllocator[_RB], raw: Optional[_RB] = None, *, bind: bool = True, ) -> DeviceArray: # Overloaded just to restrict the type signature return super().allocate(allocator, raw, bind=bind)
[docs]class CompoundIOSlot(IOSlot): """IO slot that owns multiple child slots, and presents the combined requirement. The children must all have the same type and shape. This is used for connecting a single buffer to multiple operations. Parameters ---------- children Child slots Raises ------ ValueError If `children` is empty, or if elements have inconsistent shapes ValueError If any child is not attachable TypeError If `children` contains inconsistent data types """ def __init__(self, children: Iterable[IOSlot]) -> None: self.children = list(children) if not len(self.children): raise ValueError("empty child list") shape = self.children[0].shape dtype = self.children[0].dtype dimensions = self.children[0].dimensions # Validate consistency and compute combined requirements for child in self.children: if not child.attachable(): raise ValueError("child is not attachable") if child.shape != shape: raise ValueError("inconsistent shapes") if child.dtype != dtype: raise TypeError("inconsistent dtypes") for dimension in child.dimensions: if dimension.frozen: raise ValueError("child has frozen dimensions") for child in self.children: for x, y in zip(dimensions, child.dimensions): x.link(y) super().__init__(dimensions, dtype) for child in self.children: child.is_root = False def _bind(self, buffer: Optional[DeviceArray]) -> None: super()._bind(buffer) for child in self.children: child._bind(buffer)
[docs]class AliasIOSlot(IOSlotBase): """Slot that allocates a single low-level buffer to back multiple children. The child slots need not have the same type or shape. This is typically used when logically distinct slots can share memory because they do not contain live data at the same time. Parameters ---------- children Raises ------ ValueError If `children` is empty or contains non-attachable elements """ def __init__(self, children: Iterable[IOSlotBase]) -> None: super().__init__() self.children = list(children) self.raw: Optional[Any] = None if not len(self.children): raise ValueError("empty child list") for child in self.children: if not child.attachable(): raise ValueError("child is not attachable") for child in self.children: child.is_root = False
[docs] def is_bound(self) -> bool: return self.raw is not None
[docs] def required_bytes(self) -> int: return max([child.required_bytes() for child in self.children])
def _allocate_host(self, context: AbstractContext) -> HostArray: return HostArray((self.required_bytes(),), np.uint8, context=context) def _allocate( self, allocator: AbstractAllocator[_RB], raw: Optional[_RB] = None, *, bind: bool = True, ) -> _RB: if raw is None: raw = allocator.allocate_raw(self.required_bytes()) if bind: for child in self.children: child._allocate(allocator, raw, bind=bind) self.raw = raw return raw
[docs]class Operation(ABC): """An instance of a device operation. Typically one first creates a template (which contains the program code, and is expensive to create) and then instantiates it for use with specific buffers. An instance of this class contains *slots*, which are named instances of :class:`IOSlot`. The user binds specific buffers to these slots to specify the memory used in the operation. This class is only useful when subclassed. The subclass will populate the slots. Subclasses also provide a `_run` function that handles the implementation of `__call__`. Operations are arranged in a tree, with internal nodes subclassing :class:`OperationSequence`. Internal nodes provide slots that proxy for the slots of their children; the children's slots should not be manipulated directly. When a parent ceases to exist, its children should no longer be used i.e., they cannot be reattached to a new parent. Parameters ---------- command_queue Command queue for the operation allocator Allocator used to allocate unbound slots Attributes ---------- slots : dictionary Maps names to slot instances hidden_slots : dictionary Extra slots which are not root slots, and hence cannot have buffers bound to them, but whose buffers can still be referenced. This is generally used by :class:`OperationSequence` when several slots are aliased to the same memory. is_root : boolean True if this operation is not part of an :class:`OperationSequence`. """ def __init__( self, command_queue: AbstractCommandQueue, allocator: Optional[AbstractAllocator] = None, ) -> None: if allocator is None: allocator = DeviceAllocator(command_queue.context) elif allocator.context is not None: if allocator.context is not command_queue.context: raise ValueError("command_queue and allocator have different contexts") self.slots: Dict[str, IOSlotBase] = {} self.hidden_slots: Dict[str, IOSlotBase] = {} self.command_queue = command_queue self.is_root = True self.allocator = allocator
[docs] def bind(self, **kwargs) -> None: """Bind buffers to slots by keyword. Each keyword argument name specifies a slot name. Raises ------ KeyError if a named slots does not exist TypeError if a named slot if an alias slot """ for name, buffer in kwargs.items(): slot = self.slots[name] if not isinstance(slot, IOSlot): raise TypeError(f"Slot {slot} is not an IOSlot") slot.bind(buffer)
[docs] def ensure_bound(self, name: str) -> None: """Make sure that a specific slot has a buffer bound, allocating if necessary.""" slot = self.slots[name] if not slot.is_bound(): slot.allocate(self.allocator)
[docs] def ensure_all_bound(self) -> None: """Make sure that all slots have a buffer bound, allocating if necessary.""" for slot in self.slots.values(): if not slot.is_bound(): slot.allocate(self.allocator)
[docs] def buffer(self, name: str) -> DeviceArray: """Retrieve the buffer bound to a slot. It will consult both :attr:`slots` and :attr:`hidden_slots`. Parameters ---------- name : str Name of the slot to access Returns ------- :class:`DeviceArray` Buffer bound to slot `name`. Raises ------ KeyError If no slot with this name exists TypeError If the slot exists but it is an alias slot ValueError If the slot exists but does not yet have a buffer bound """ try: slot = self.slots[name] except KeyError: try: slot = self.hidden_slots[name] except KeyError: raise KeyError("no slot named " + name) from None if isinstance(slot, IOSlot): if slot.buffer is None: raise ValueError("slot " + name + " has no buffer bound") return slot.buffer else: raise TypeError("slot " + name + " is an alias slot")
[docs] def required_bytes(self) -> int: """Return number of bytes of device storage required.""" return sum([x.required_bytes() for x in self.slots.values()])
[docs] def parameters(self) -> Mapping[str, Any]: """Return dictionary of configuration options for this operation.""" return {}
@abstractmethod def _run(self) -> Any: raise NotImplementedError("abstract base class") def __call__(self, **kwargs) -> Any: """Run the operation. Any slots that are not already bound will allocate new buffers. Keyword arguments are passed to :meth:`bind`. """ self.bind(**kwargs) self.ensure_all_bound() return self._run()
[docs]class OperationSequence(Operation): """Convenience class for an operation that is built up of smaller named operations. Slots are mapped to share data between these smaller operations. Initially, each slot named *slot* in a child named *op* is remapped to a parent slot named *op*:*slot*. After this, each set provided in `compounds` is removed from the slots and combined into a single compound slot. Finally, each set in `aliases` is removed and combined into an alias slot. For both `compounds` and `aliases`, if a child does not exist, it is skipped, and if none of the children exist, no action is taken. Parameters ---------- command_queue Command queue for the operation operations : sequence of 2-tuples Name, operation pairs to add. Calling the operation executes them in order compounds : mapping of `str` to sequence of `str`, optional Names for compound slots, mapped to the original slot names that are replaced aliases : mapping of `str` to sequence of `str`, optional Names for alias slots, mapped to the original slot names that are replaced allocator : :class:`DeviceAllocator` or :class:`SVMAllocator`, optional Allocator used to allocate unbound slots """ def __init__( self, command_queue: AbstractCommandQueue, operations: Iterable[Tuple[str, Operation]], compounds: Optional[Mapping[str, Iterable[str]]] = None, aliases: Optional[Mapping[str, Iterable[str]]] = None, allocator: Optional[AbstractAllocator] = None, ) -> None: super().__init__(command_queue, allocator) self.operations = OrderedDict(operations) for name, operation in self.operations.items(): if operation.command_queue is not command_queue: raise ValueError("child has a different command queue to the parent") if not operation.is_root: raise ValueError("child already has another parent") for slot_name, slot in operation.slots.items(): self.slots[name + ":" + slot_name] = slot if compounds is not None: for name, child_names in compounds.items(): children = self._extract_slots(child_names, False) if children: for child in children: if not isinstance(child, IOSlot): raise TypeError(f"Children of {name} must all be IOSlots") self.slots[name] = CompoundIOSlot(cast(Iterable[IOSlot], children)) if aliases is not None: for name, child_names in aliases.items(): children = self._extract_slots(child_names, True) if children: self.slots[name] = AliasIOSlot(children) for operation in self.operations.values(): operation.is_root = False def _extract_slots(self, names: Iterable[str], add_to_hidden: bool) -> List[IOSlotBase]: """Remove and return the slots with the given names.""" ans: List[IOSlotBase] = [] for name in names: try: ans.append(self.slots[name]) del self.slots[name] if add_to_hidden: assert name not in self.hidden_slots self.hidden_slots[name] = ans[-1] except KeyError: # Ignore missing items pass return ans def _run(self) -> None: for operation in self.operations.values(): operation()
def _slot_label(slot: IOSlotBase, name: str, hide_detail: bool) -> str: """Produce contents of HTML label for a slot. If `hide_detail` is true, it will not show the shapes and dtype. This is used when the details will appear on a parent. """ if isinstance(slot, AliasIOSlot): return f'<b>{name}</b><br/><font color="red">alias slot</font>' elif hide_detail: return f"<b>{name}</b>" else: assert isinstance(slot, IOSlot) shape = "×".join(str(x) for x in slot.shape) if not shape: shape = "scalar" padded_shape = "×".join(str(x) for x in slot.required_padded_shape()) dtype = str(np.dtype(slot.dtype)) return f"<b>{name}</b><br/>{shape}<br/>{padded_shape}<br/>{dtype}" def _visualize_operation( g: "graphviz.Graph", operation: Operation, path: Tuple[str, ...], slot_map: MutableMapping[IOSlotBase, str], no_detail_slots: Set[IOSlotBase], edges: Set[Tuple[IOSlotBase, IOSlotBase]], ) -> None: """Recursive implementation of :func:`visualize_operation`. Parameters ---------- g Graph or subgraph to be updated with the operation. If the operation has children, it will be rendered as a subgraph, otherwise as a single slot with ports for the slots. operation Operation to add to the graph path Hierarchical name of `operation`. The last component is used for visual display, and the rest for generating unique node labels. slot_map Mapping from slots to node names (or port names). It is updated in place with all the slots in this operation and its descendants. no_detail_slots Slots that should not show shape and dtype details. It is updated in place with any descendant slots of this node. edges Edges to draw, as tuples of two :class:`IOSlotBase`s. """ all_slots = {**operation.slots, **operation.hidden_slots} if isinstance(operation, OperationSequence): # Graphviz treats subgraphs with the prefix 'cluster_' specially. We # only want this for real subgraphs, not the root. node_name = "cluster_" + "!".join(path) if len(path) > 1 else "root" for slot_name, slot in all_slots.items(): if isinstance(slot, CompoundIOSlot): for child_slot in slot.children: no_detail_slots.add(child_slot) with g.subgraph(name=node_name) as sub: sub.attr("graph", label=path[-1]) for child_name, child in operation.operations.items(): _visualize_operation( sub, child, path + (child_name,), slot_map, no_detail_slots, edges ) for slot_name, slot in list(all_slots.items()): if slot not in slot_map: # It's already represented in the child # ":" has special meaning to graphviz, so we replace it with "+". full_name = "slot_{}_{}".format("!".join(path), slot_name.replace(":", "+")) slot_map[slot] = full_name slot_label = "<" + _slot_label(slot, slot_name, slot in no_detail_slots) + ">" sub.node(full_name, label=slot_label, shape="box") else: del all_slots[slot_name] else: node_name = "op_" + "!".join(path) label = io.StringIO() label.write( '<<table border="1" cellborder="0" rows="*" columns="*">' f'<tr><td colspan="{len(operation.slots)}">{path[-1]}</td></tr><tr>' ) for slot_name, slot in operation.slots.items(): label.write( f'<td port="{slot_name}"><font point-size="9"><i>{slot_name}</i></font></td>' ) slot_map[slot] = node_name + ":" + slot_name.replace(":", "+") label.write("</tr></table>>") g.node(node_name, label=label.getvalue(), shape="plain") for slot in all_slots.values(): if hasattr(slot, "children"): children: Sequence[IOSlotBase] = getattr(slot, "children") for child2 in children: edges.add((child2, slot))
[docs]def visualize_operation(operation: Operation, filename: str) -> None: """Write a visualization of an :class:`Operation` to file. This requires the `graphviz package`_ to be installed. .. _graphviz package: https://graphviz.readthedocs.io/en/stable/ Parameters ---------- operation Operation or operation sequence to visualize filename Base filename to write. It should end in ``.gv``, and a rendered PDF will automatically be produced with a filename ending in ``.gv.pdf``. """ from graphviz import Graph g = Graph() g.attr("graph", clusterrank="local", ranksep="3", compound="yes") slot_map: Dict[IOSlotBase, str] = {} edges: Set[Tuple[IOSlotBase, IOSlotBase]] = set() _visualize_operation(g, operation, ("root",), slot_map, set(), edges) for a, b in edges: g.edge(slot_map[a], slot_map[b]) g.render(filename)