################################################################################
# Copyright (c) 2019-2021, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Abstract base classes for :mod:`.opencl` and :mod:`.cuda`."""
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Any, Generic, List, Optional, Sequence, Tuple, Type, TypeVar
import numpy as np
try:
from numpy.typing import DTypeLike
except ImportError:
DTypeLike = Any # type: ignore
_B = TypeVar("_B") # buffer type
_RB = TypeVar("_RB") # raw buffer type
_RS = TypeVar("_RS") # raw buffer for SVM
_C = TypeVar("_C", bound="AbstractContext")
_D = TypeVar("_D", bound="AbstractDevice")
_E = TypeVar("_E", bound="AbstractEvent")
_K = TypeVar("_K", bound="AbstractKernel")
_P = TypeVar("_P", bound="AbstractProgram")
_Q = TypeVar("_Q", bound="AbstractCommandQueue")
_TQ = TypeVar("_TQ", bound="AbstractTuningCommandQueue")
[docs]class AbstractProgram(ABC, Generic[_K]):
"""Abstraction of a program object."""
[docs] @abstractmethod
def get_kernel(self, name: str) -> "AbstractKernel":
"""Create a new kernel.
Parameters
----------
name
Name of the kernel function
"""
[docs]class AbstractKernel(ABC, Generic[_P]):
"""Abstraction of a kernel object.
The object can be enqueued using :meth:`AbstractCommandQueue.enqueue_kernel`.
The recommended way to create this object is via
:meth:`AbstractProgram.get_kernel`.
"""
@abstractmethod
def __init__(self, program: _P, name: str) -> None:
pass
[docs]class AbstractEvent(ABC):
"""Abstraction of an event.
This is more akin to a CUDA event than an OpenCL event, in that it is a
marker in a command queue rather than associated with a specific command.
"""
[docs] @abstractmethod
def wait(self) -> None:
"""Block until the event has completed."""
[docs] @abstractmethod
def time_since(self: _E, prior_event: _E) -> float:
"""Return the time in seconds from `prior_event` to self.
Unlike the PyCUDA method of the same name, this will wait for the
events to complete if they have not already.
"""
[docs] @abstractmethod
def time_till(self: _E, next_event: _E) -> float:
"""Return the time in seconds from this event to `next_event`.
See :meth:`time_since`.
"""
[docs]class AbstractDevice(ABC, Generic[_C]):
"""Abstraction of a device."""
[docs] @abstractmethod
def make_context(self) -> "AbstractContext":
"""Create a new context associated with this device."""
@property
@abstractmethod
def name(self) -> str:
"""Return human-readable name for the device."""
@property
@abstractmethod
def platform_name(self) -> str:
"""Return human-readable name for the platform owning the device."""
@property
@abstractmethod
def driver_version(self) -> str:
"""Return human-readable name for the driver version."""
@property
@abstractmethod
def is_cuda(self) -> bool:
"""Whether the device is a CUDA device."""
@property
@abstractmethod
def is_gpu(self) -> bool:
"""Whether device is a GPU."""
@property
@abstractmethod
def is_accelerator(self) -> bool:
"""Whether device is an accelerator (as defined by OpenCL device types)."""
@property
@abstractmethod
def is_cpu(self) -> bool:
"""Whether the device is a CPU."""
@property
@abstractmethod
def simd_group_size(self) -> int:
"""Return the number of workitems that run in lock-step.
This must only be used to tune performance parameters; there are no
guarantees about memory coherency, forward progress etc.
"""
[docs] @classmethod
@abstractmethod
def get_devices(cls: Type[_D]) -> Sequence[_D]:
"""Return a list of all devices on all platforms."""
[docs]class AbstractContext(ABC, Generic[_B, _RB, _RS, _D, _P, _Q, _TQ]):
"""Abstraction of an OpenCL/CUDA context."""
@property
@abstractmethod
def device(self) -> AbstractDevice:
"""Return the device associated with the context (or the first device, if multiple)."""
[docs] @abstractmethod
def compile(self, source: str, extra_flags: Optional[List[str]] = None) -> AbstractProgram:
"""Build a program object from source.
Parameters
----------
source
Source code
extra_flags
Extra parameters to pass to the compiler
"""
[docs] @abstractmethod
def allocate_raw(self, n_bytes: int) -> _RB:
"""Create an untyped buffer on the device."""
[docs] @abstractmethod
def allocate(self, shape: Tuple[int, ...], dtype: DTypeLike, raw: Optional[_RB] = None) -> _B:
"""Create a typed buffer on the device.
Parameters
----------
shape
Shape for the array
dtype
Type for the data
raw
Memory backing the array (automatically allocated if ``None``)
"""
[docs] @abstractmethod
def allocate_pinned(self, shape: Tuple[int, ...], dtype: DTypeLike) -> np.ndarray:
"""Create a buffer in host memory that can be efficiently copied to and from the device.
Parameters
----------
shape
Shape for the array
dtype
Type for the data
"""
[docs] @abstractmethod
def allocate_svm_raw(self, n_bytes: int) -> _RS:
"""Allocate raw storage that can be passed to :meth:`allocate_svm`."""
[docs] @abstractmethod
def allocate_svm(
self, shape: Tuple[int, ...], dtype: DTypeLike, raw: Optional[_RS] = None
) -> np.ndarray:
"""Allocate shared virtual memory."""
[docs] @abstractmethod
def create_command_queue(self, profile: bool = False) -> "AbstractCommandQueue":
"""Create a new command queue associated with this context.
Parameters
----------
profile
If true, the command queue will support timing kernels
"""
[docs] @abstractmethod
def create_tuning_command_queue(self) -> "AbstractTuningCommandQueue":
"""Create a new command queue for doing autotuning."""
@abstractmethod
def __enter__(self: _C) -> _C:
pass
@abstractmethod
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
pass
[docs]class AbstractCommandQueue(ABC, Generic[_B, _C, _E, _K]):
"""Abstraction of a command queue."""
context: _C
[docs] @abstractmethod
def enqueue_read_buffer(self, buffer: _B, data: Any, blocking: bool = True) -> None:
"""Copy data from the device to the host.
Only whole-buffer copies are supported, and the shape and type must
match. In general, one should use the convenience functions in
:class:`accel.DeviceArray`.
Parameters
----------
buffer
Source
data
Target
blocking
If true (default) the call blocks until the copy is complete
"""
[docs] @abstractmethod
def enqueue_write_buffer(self, buffer: _B, data: Any, blocking=True) -> None:
"""Copy data from the host to the device.
Only whole-buffer copies are supported, and the shape and type must
match. In general, one should use the convenience functions in
:class:`accel.DeviceArray`.
Parameters
----------
buffer
Target
data : array-like
Source
blocking
If true (default), the call blocks until the source has been fully
read (it has not necessarily reached the device).
"""
[docs] @abstractmethod
def enqueue_copy_buffer_rect(
self,
src_buffer: _B,
dest_buffer: _B,
src_origin: int,
dest_origin: int,
shape: Sequence[int],
src_strides: Sequence[int],
dest_strides: Sequence[int],
) -> None:
"""Copy a subregion of one buffer to another.
This is a low-level interface that ignores the shape, strides etc of
the buffers, and treats them as byte arrays. It also only supports 3 or
fewer dimensions. Use
:py:meth:`~katsdpsigproc.accel.DeviceArray.copy_region` for a
high-level interface.
Parameters
----------
src_buffer,dest_buffer
Source and destination buffers
src_origin,dest_origin
Offsets for the start of the copy, in bytes
shape
Shape of the region to copy (1-3 elements). The first dimension is
a byte count.
src_strides,dest_strides
Strides for the source and destination memory layout, with the same
length as `shape`. The first element of each must be 1, and each
element must be a factor of the next element.
"""
[docs] @abstractmethod
def enqueue_read_buffer_rect(
self,
buffer: _B,
data: Any,
buffer_origin: int,
data_origin: int,
shape: Sequence[int],
buffer_strides: Sequence[int],
data_strides: Sequence[int],
blocking: bool = True,
) -> None:
"""Copy a region of a buffer to host memory.
This is a low-level interface that ignores the shape, strides etc of
the buffers, and treats them as byte arrays. It also only supports 3 or
fewer dimensions. Use
:py:meth:`~katsdpsigproc.accel.DeviceArray.set_region` for a high-level
interface.
Parameters
----------
buffer
Source
data : array-like
Target
buffer_origin, data_origin
Offsets for the start of the copy, in bytes
shape
Shape of the region to copy (1-3 elements). The first dimension is
a byte count.
buffer_strides,data_strides
Strides for the destination and source memory layout, with the same
length as `shape`. The first element of each must be 1, and each
element must be a factor of the next element.
blocking
If true, block until the transfer is complete.
"""
[docs] @abstractmethod
def enqueue_write_buffer_rect(
self,
buffer: _B,
data: Any,
buffer_origin: int,
data_origin: int,
shape: Sequence[int],
buffer_strides: Sequence[int],
data_strides: Sequence[int],
blocking: bool = True,
) -> None:
"""Copy a region of host memory to a buffer.
This is a low-level interface that ignores the shape, strides etc of
the buffers, and treats them as byte arrays. It also only supports 3 or
fewer dimensions. Use
:py:meth:`~katsdpsigproc.accel.DeviceArray.set_region` for a high-level
interface.
Parameters
----------
buffer
Target
data : array-like
Source
buffer_origin, data_origin
Offsets for the start of the copy, in bytes
shape
Shape of the region to copy (1-3 elements). The first dimension is
a byte count.
buffer_strides,data_strides
Strides for the destination and source memory layout, with the same
length as `shape`. The first element of each must be 1, and each
element must be a factor of the next element.
blocking
If true, block until the transfer is complete.
"""
[docs] @abstractmethod
def enqueue_zero_buffer(self, buffer: _B) -> None:
"""Fill a buffer with zero bytes."""
[docs] @abstractmethod
def enqueue_kernel(
self,
kernel: _K,
args: Sequence[Any],
global_size: Tuple[int, ...],
local_size: Tuple[int, ...],
) -> None:
"""Enqueue a kernel to the command queue.
.. warning:: It is not thread-safe to call this function in two threads
on the same kernel at the same time.
Parameters
----------
kernel
Kernel to run
args
Arguments to pass to the kernel. Refer to the PyOpenCL/CUDA
documentation for details. Additionally, this function allows
a low-level device array to be passed.
global_size
Number of work-items in each global dimension
local_size
Number of work-items in each local dimension. Must divide
exactly into `global_size`.
"""
[docs] @abstractmethod
def enqueue_marker(self) -> AbstractEvent:
"""Create an event at this point in the command queue."""
[docs] @abstractmethod
def enqueue_wait_for_events(self, events: Sequence[_E]) -> None:
"""Enqueue a barrier to wait for all events in `events`."""
[docs] @abstractmethod
def flush(self) -> None:
"""Start enqueued work running, but do not wait for it to complete."""
[docs] @abstractmethod
def finish(self) -> None:
"""Block until all enqueued work has completed."""
[docs]class AbstractTuningCommandQueue(AbstractCommandQueue[_B, _C, _E, _K]):
"""Command queue with extra facilities for autotuning.
It keeps track of kernels that are enqueued since the last call to
:meth:`start_tuning`, and reports the total time they consume when
:meth:`stop_tuning` is called.
"""
[docs] @abstractmethod
def start_tuning(self) -> None:
pass
[docs] @abstractmethod
def stop_tuning(self) -> float:
pass