"""Reduction algorithms."""
from typing import Any, Callable, Mapping, Optional, Tuple, cast
import numpy as np
from typing_extensions import TypedDict
try:
from numpy.typing import DTypeLike
except ImportError:
DTypeLike = Any # type: ignore
from . import accel, tune
from .abc import AbstractCommandQueue, AbstractContext
class _TuningDict(TypedDict):
wgsx: int
wgsy: int
[docs]class HReduceTemplate:
"""Performs reduction along rows in a 2D array.
Only commutative reduction operators are supported.
Parameters
----------
context
Context for which kernels will be compiled
dtype
Type of data elements
ctype
Type (in C/CUDA, not numpy) of data elements
op
C expression to combine two variables, *a* and *b*
identity
C expression for an identity value for *op*
extra_code
Arbitrary C code to paste in (for use by *op* or *identity*)
tuning
Kernel tuning parameters; if omitted, will autotune. The possible
parameters are
- wgsx: number of workitems per workgroup per row
- wgsy: number of rows to handle per workgroup
"""
def __init__(
self,
context: AbstractContext,
dtype: DTypeLike,
ctype: str,
op: str,
identity: str,
extra_code: str = "",
tuning: Optional[_TuningDict] = None,
) -> None:
self.context = context
self.dtype = dtype
self.ctype = ctype
if tuning is None:
tuning = self.autotune(context, dtype, ctype, op, identity, extra_code)
self.wgsx = tuning["wgsx"]
self.wgsy = tuning["wgsy"]
self.op = op
self.identity = identity
self.extra_code = extra_code
self.program = accel.build(
context,
"hreduce.mako",
{
"wgsx": self.wgsx,
"wgsy": self.wgsy,
"type": self.ctype,
"op": op,
"identity": identity,
"extra_code": extra_code,
},
)
[docs] @classmethod
@tune.autotuner(test={"wgsx": 64, "wgsy": 4})
def autotune(
cls,
context: AbstractContext,
dtype: DTypeLike,
ctype: str,
op: str,
identity: str,
extra_code: str,
) -> _TuningDict:
queue = context.create_tuning_command_queue()
shape = (2048, 1024)
src = accel.DeviceArray(context, shape, dtype=dtype)
dest = accel.DeviceArray(context, (shape[0],), dtype=dtype)
def generate(wgsx: int, wgsy: int) -> Callable[[int], float]:
wgs = wgsx * wgsy
if wgs < 32 or wgs > 1024:
raise RuntimeError(f"Skipping work group size {wgsx}x{wgsy}")
template = cls(
context,
dtype,
ctype,
op,
identity,
extra_code,
{"wgsx": wgsx, "wgsy": wgsy},
)
fn = template.instantiate(queue, shape)
fn.bind(src=src, dest=dest)
fn.ensure_all_bound()
return tune.make_measure(queue, fn)
return cast(
_TuningDict,
tune.autotune(generate, wgsx=[32, 64, 128], wgsy=[1, 2, 4, 8, 16]),
)
[docs] def instantiate(
self,
command_queue: AbstractCommandQueue,
shape: Tuple[int, int],
column_range: Optional[Tuple[int, int]] = None,
allocator: Optional[accel.AbstractAllocator] = None,
) -> "HReduce":
return HReduce(self, command_queue, shape, column_range, allocator)
[docs]class HReduce(accel.Operation):
"""Concrete instance of :class:`HReduceTemplate`.
In each row, the elements in the specified column range are reduced using
the reduction operator supplied to the template.
.. rubric:: Slots
**src** : *rows* × *columns*
Input data
**dest** : *rows*
Output reductions
Parameters
----------
template
Operation template
command_queue
Command queue for the operation
shape
Shape for the source slot
column_range
Half-open range of columns to reduce (defaults to the entire array)
allocator
Allocator used to allocate unbound slots
"""
def __init__(
self,
template: HReduceTemplate,
command_queue: AbstractCommandQueue,
shape: Tuple[int, int],
column_range: Optional[Tuple[int, int]] = None,
allocator: Optional[accel.AbstractAllocator] = None,
) -> None:
if len(shape) != 2:
raise ValueError("shape must be 2-dimensional")
if column_range is None:
column_range = (0, shape[1])
if column_range[0] < 0 or column_range[1] > shape[1]:
raise ValueError("column range overflows the array")
if column_range[0] >= column_range[1]:
raise ValueError("column range is empty")
super().__init__(command_queue, allocator)
self.template = template
self.kernel = template.program.get_kernel("hreduce")
self.column_range = column_range
self.slots["src"] = accel.IOSlot(
(accel.Dimension(shape[0], self.template.wgsy), shape[1]),
self.template.dtype,
)
self.slots["dest"] = accel.IOSlot(
(accel.Dimension(shape[0], self.template.wgsy),), self.template.dtype
)
def _run(self) -> None:
src = self.buffer("src")
dest = self.buffer("dest")
rows_padded = accel.roundup(src.shape[0], self.template.wgsy)
n_columns = self.column_range[1] - self.column_range[0]
self.command_queue.enqueue_kernel(
self.kernel,
[
src.buffer,
dest.buffer,
np.int32(self.column_range[0]),
np.int32(n_columns),
np.int32(src.padded_shape[1]),
],
global_size=(self.template.wgsx, rows_padded),
local_size=(self.template.wgsx, self.template.wgsy),
)
[docs] def parameters(self) -> Mapping[str, Any]:
return {
"dtype": self.template.dtype,
"ctype": self.template.ctype,
"shape": self.slots["src"].shape, # type: ignore
"column_range": self.column_range,
"op": self.template.op,
"identity": self.template.identity,
"extra_code": self.template.extra_code,
}