Source code for katsdpsigproc.maskedsum

################################################################################
# Copyright (c) 2014-2019, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Perform on-device percentile calculation of 2D arrays."""
# see scripts/maskedsumtest.py for an example

from typing import Any, Callable, Mapping, Optional, Tuple, cast

import numpy as np
from typing_extensions import TypedDict

from . import accel, tune
from .abc import AbstractCommandQueue, AbstractContext


class _TuningDict(TypedDict):
    size: int


[docs]class MaskedSumTemplate: """Kernel for calculating masked sums of a 2D array of data. Masked sums are calculated per column (along rows, independently per column). Parameters ---------- context Context for which kernels will be compiled use_amplitudes If true, the amplitudes of the inputs rather than the inputs themselves will be summed. tuning Kernel tuning parameters; if omitted, will autotune. The possible parameters are - size: number of workitems per workgroup """ autotune_version = 1 def __init__( self, context: AbstractContext, use_amplitudes: bool = False, tuning: Optional[_TuningDict] = None, ) -> None: self.context = context self.use_amplitudes = use_amplitudes if tuning is None: tuning = self.autotune(context, use_amplitudes) self.size = tuning["size"] self.program = accel.build( context, "maskedsum.mako", {"size": self.size, "use_amplitudes": use_amplitudes}, )
[docs] @classmethod @tune.autotuner(test={"size": 256}) def autotune(cls, context: AbstractContext, use_amplitudes: bool) -> _TuningDict: queue = context.create_tuning_command_queue() columns = 5000 in_shape = (4096, columns) rs = np.random.RandomState(seed=1) host_data = rs.uniform(size=(in_shape[0], in_shape[1], 2)).astype(np.float32) host_data = host_data.view(dtype=np.complex64)[..., 0] host_mask = np.ones((in_shape[0],)).astype(np.float32) def generate(size: int) -> Callable[[int], float]: fn = cls(context, use_amplitudes, {"size": size}).instantiate(queue, in_shape) inp = fn.slots["src"].allocate(fn.allocator) msk = fn.slots["mask"].allocate(fn.allocator) fn.slots["dest"].allocate(fn.allocator) inp.set(queue, host_data) msk.set(queue, host_mask) return tune.make_measure(queue, fn) return cast(_TuningDict, tune.autotune(generate, size=[32, 64, 128, 256, 512, 1024]))
[docs] def instantiate( self, command_queue: AbstractCommandQueue, shape: Tuple[int, int], allocator: Optional[accel.AbstractAllocator] = None, ) -> "MaskedSum": return MaskedSum(self, command_queue, shape, allocator)
[docs]class MaskedSum(accel.Operation): """Concrete instance of :class:`MaskedSumTemplate`. .. rubric:: Slots **src** Input type complex64 Shape is number of rows by number of columns, masked sum is calculated along the rows, independently per column. **mask** Input type float32 Shape is (number of rows of input). **dest** Output type complex64 Shape is (number of columns of input) """ def __init__( self, template: MaskedSumTemplate, command_queue: AbstractCommandQueue, shape: Tuple[int, int], allocator: Optional[accel.AbstractAllocator] = None, ) -> None: super().__init__(command_queue, allocator) self.template = template self.kernel = template.program.get_kernel("maskedsum_float") self.shape = shape self.slots["src"] = accel.IOSlot( (shape[0], accel.Dimension(shape[1], template.size)), np.complex64 ) self.slots["mask"] = accel.IOSlot((shape[0],), np.float32) self.slots["dest"] = accel.IOSlot( (accel.Dimension(shape[1], template.size),), np.float32 if template.use_amplitudes else np.complex64, ) def _run(self) -> None: src = self.buffer("src") mask = self.buffer("mask") dest = self.buffer("dest") self.command_queue.enqueue_kernel( self.kernel, [ src.buffer, mask.buffer, dest.buffer, np.int32(src.padded_shape[1]), np.int32(src.shape[0]), ], global_size=(accel.roundup(src.shape[1], self.template.size),), local_size=(self.template.size,), )
[docs] def parameters(self) -> Mapping[str, Any]: return { "shape": self.slots["src"].shape, # type: ignore "use_amplitudes": self.template.use_amplitudes, }