Source code for katsdpsigproc.percentile

################################################################################
# Copyright (c) 2014-2021, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Perform on-device percentile calculation of 2D arrays."""
# see scripts/percentiletest.py for an example

from typing import Any, Callable, Mapping, Optional, Tuple, cast

import numpy as np
from typing_extensions import TypedDict

from . import accel, tune
from .abc import AbstractCommandQueue, AbstractContext


class _TuningDict(TypedDict):
    size: int
    wgsy: int


[docs]class Percentile5Template: """Kernel for calculating percentiles of a 2D array of data. 5 percentiles [0,100,25,75,50] are calculated per row (along columns, independently per row). The lower percentile element, rather than a linear interpolation is chosen. WARNING: assumes all values are positive. Parameters ---------- context Context for which kernels will be compiled max_columns Maximum number of columns is_amplitude If true, the inputs are scalar amplitudes; if false, they are complex numbers and the answers are computed on the absolute values tuning Kernel tuning parameters; if omitted, will autotune. The possible parameters are - size: number of workitems per workgroup along each row - wgsy: number of workitems per workgroup along each column """ autotune_version = 8 def __init__( self, context: AbstractContext, max_columns: int, is_amplitude: bool = True, tuning: Optional[_TuningDict] = None, ) -> None: self.context = context self.max_columns = max_columns self.is_amplitude = is_amplitude if tuning is None: tuning = self.autotune(context, max_columns, is_amplitude) self.size = tuning["size"] self.wgsy = tuning["wgsy"] self.vt = accel.divup(max_columns, tuning["size"]) self.program = accel.build( context, "percentile.mako", { "size": self.size, "wgsy": self.wgsy, "vt": self.vt, "is_amplitude": self.is_amplitude, }, )
[docs] @classmethod @tune.autotuner(test={"size": 64, "wgsy": 4}) def autotune( cls, context: AbstractContext, max_columns: int, is_amplitude: bool ) -> _TuningDict: queue = context.create_tuning_command_queue() in_shape = (4096, max_columns) rs = np.random.RandomState(seed=1) if is_amplitude: host_data: np.ndarray = rs.uniform(size=in_shape).astype(np.float32) else: host_data = rs.standard_normal(in_shape) + 1j * rs.standard_normal(in_shape) host_data = host_data.astype(np.complex64) def generate(size: int, wgsy: int) -> Callable[[int], float]: if size * wgsy < 32 or size * wgsy > 1024: raise RuntimeError("work group size is unnecessarily large or small, skipping") if max_columns > size * 256: raise RuntimeError("too many columns") fn = cls(context, max_columns, is_amplitude, {"size": size, "wgsy": wgsy}).instantiate( queue, in_shape ) inp = fn.slots["src"].allocate(fn.allocator) fn.slots["dest"].allocate(fn.allocator) inp.set(queue, host_data) return tune.make_measure(queue, fn) return cast( _TuningDict, tune.autotune( generate, size=[8, 16, 32, 64, 128, 256, 512, 1024], wgsy=[1, 2, 4, 8, 16, 32], ), )
[docs] def instantiate( self, command_queue: AbstractCommandQueue, shape: Tuple[int, int], column_range: Optional[Tuple[int, int]] = None, allocator: Optional[accel.AbstractAllocator] = None, ) -> "Percentile5": return Percentile5(self, command_queue, shape, column_range, allocator)
[docs]class Percentile5(accel.Operation): """Concrete instance of :class:`PercentileTemplate`. .. warning:: Assumes all values are positive when `template.is_amplitude` is `True`. .. rubric:: Slots **src** Input type float32 or complex64. Shape is number of rows by number of columns, where 5 percentiles are computed along the columns, per row. **dest** Output type float32. Shape is (5, number of rows of input) Parameters ---------- template Operation template command_queue Command queue for the operation shape Shape of the source data column_range: Half-open interval of columns that will be processed. If not specified, all columns are processed. allocator Allocator used to allocate unbound slots """ def __init__( self, template: Percentile5Template, command_queue: AbstractCommandQueue, shape: Tuple[int, int], column_range: Optional[Tuple[int, int]], allocator: Optional[accel.AbstractAllocator] = None, ) -> None: super().__init__(command_queue, allocator) if column_range is None: column_range = (0, shape[1]) if column_range[1] <= column_range[0]: raise ValueError("column range is empty") if column_range[0] < 0 or column_range[1] > shape[1]: raise IndexError("column range is out of range") if column_range[1] - column_range[0] > template.max_columns: raise ValueError("columns exceeds max_columns") self.template = template self.kernel = template.program.get_kernel("percentile5_float") self.shape = shape self.column_range = column_range src_type = np.float32 if self.template.is_amplitude else np.complex64 row_dim = accel.Dimension(shape[0], self.template.wgsy) col_dim = accel.Dimension(shape[1]) self.slots["src"] = accel.IOSlot((row_dim, col_dim), src_type) self.slots["dest"] = accel.IOSlot((5, row_dim), np.float32) def _run(self) -> None: src = self.buffer("src") dest = self.buffer("dest") rows_padded = accel.roundup(src.shape[0], self.template.wgsy) self.command_queue.enqueue_kernel( self.kernel, [ src.buffer, dest.buffer, np.int32(src.padded_shape[1]), np.int32(dest.padded_shape[1]), np.int32(self.column_range[0]), np.int32(self.column_range[1] - self.column_range[0]), ], global_size=(self.template.size, rows_padded), local_size=(self.template.size, self.template.wgsy), )
[docs] def parameters(self) -> Mapping[str, Any]: return { "max_columns": self.template.max_columns, "is_amplitude": self.template.is_amplitude, "shape": self.slots["src"].shape, # type: ignore "column_range": self.column_range, }