Source code for katsdpsigproc.fft

################################################################################
# Copyright (c) 2021, 2025 National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Fast Fourier Transforms.

Currently this module only supports CUDA, using cuFFT.

It does not use scikit-cuda, because that insists on creating a cublas context
to obtain a version number, which then permanently consumes GPU memory on some
arbitrary GPU. It also seems the maintainer `no longer has time
<https://github.com/lebedov/scikit-cuda/issues/319>`_.

Only Linux is supported. Windows and MacOS support would require changing the
code for locating the library.
"""

import ctypes.util
import threading
from enum import Enum
from typing import Any, Callable, Dict, Optional, Tuple

import numpy as np

try:
    from numpy.typing import DTypeLike
except ImportError:
    DTypeLike = Any  # type: ignore

from . import accel, cuda
from .abc import AbstractCommandQueue, AbstractContext
from .accel import AbstractAllocator


[docs]class FftMode(Enum): FORWARD = 0 INVERSE = 1
class _CEnum(Enum): """Enum type that can be used with ctypes.""" @classmethod def from_param(cls, value): if not isinstance(value, cls): raise TypeError # TODO: assumes that enum and int have the size in the ABI. But this # seems to be what scikit-cuda assumed anyway. return ctypes.c_int(value.value) class _Cufft: """Wraps up the low-level ctypes access to cuFFT. This is not intended to be a complete wrapper for cuFFT, nor to be user-friendly. It provides just enough for the implementation of this module. """ cufftHandle = ctypes.c_int cudaStream_t = ctypes.c_void_p # It's a pointer to an opaque struct class cufftType(_CEnum): CUFFT_R2C = 0x2A CUFFT_C2R = 0x2C CUFFT_C2C = 0x29 CUFFT_D2Z = 0x6A CUFFT_Z2D = 0x6C CUFFT_Z2Z = 0x69 class cufftResult(_CEnum): CUFFT_SUCCESS = 0 CUFFT_INVALID_PLAN = 1 CUFFT_ALLOC_FAILED = 2 CUFFT_INVALID_TYPE = 3 CUFFT_INVALID_VALUE = 4 CUFFT_INTERNAL_ERROR = 5 CUFFT_EXEC_FAILED = 6 CUFFT_SETUP_FAILED = 7 CUFFT_INVALID_SIZE = 8 CUFFT_UNALIGNED_DATA = 9 CUFFT_INCOMPLETE_PARAMETER_LIST = 10 CUFFT_INVALID_DEVICE = 11 CUFFT_PARSE_ERROR = 12 CUFFT_NO_WORKSPACE = 13 CUFFT_NOT_IMPLEMENTED = 14 CUFFT_LICENSE_ERROR = 15 CUFFT_NOT_SUPPORTED = 16 # cufft.h has no such type (these are just #defines), but it is convenient # to use an enum in Python. class cufftDirection(_CEnum): CUFFT_FORWARD = -1 CUFFT_INVERSE = 1 class CufftError(Exception): """Error raised from cuFFT.""" def __init__(self, error_code: int): self.error_code = error_code try: name = _Cufft.cufftResult(error_code).name except ValueError: name = f"cuFFT error {error_code:#x}" super().__init__(name) @staticmethod def _errcheck(result, func, args) -> None: """Turn cufftResult into an exception.""" if result: raise _Cufft.CufftError(result) def _get_function(self, name: str, argtypes: list) -> Callable[..., None]: """Shortcut to build a ctypes function wrapper. The function must return a :class:`cufftResult`. """ func = getattr(self.lib, name) func.argtypes = argtypes func.restype = ctypes.c_int func.errcheck = self._errcheck return func def __init__(self): self.lib = ctypes.cdll.LoadLibrary(ctypes.util.find_library("cufft")) self.cufftGetVersion = self._get_function("cufftGetVersion", [ctypes.POINTER(ctypes.c_int)]) self.cufftCreate = self._get_function("cufftCreate", [ctypes.POINTER(self.cufftHandle)]) self.cufftDestroy = self._get_function("cufftDestroy", [self.cufftHandle]) self.cufftMakePlanMany64 = self._get_function( "cufftMakePlanMany64", [ self.cufftHandle, ctypes.c_int, # rank ctypes.POINTER(ctypes.c_longlong), # n ctypes.POINTER(ctypes.c_longlong), # inembed ctypes.c_longlong, # istride, ctypes.c_longlong, # idist ctypes.POINTER(ctypes.c_longlong), # onembed ctypes.c_longlong, # ostride ctypes.c_longlong, # odist self.cufftType, # type ctypes.c_longlong, # batch ctypes.POINTER(ctypes.c_size_t), # workSize ], ) self.cufftSetAutoAllocation = self._get_function( "cufftSetAutoAllocation", [self.cufftHandle, ctypes.c_int] ) self.cufftSetStream = self._get_function( "cufftSetStream", [self.cufftHandle, self.cudaStream_t] ) self.cufftSetWorkArea = self._get_function( "cufftSetWorkArea", [self.cufftHandle, ctypes.c_void_p] ) # The actual function signatures have typed pointers rather than void*, but # specifying that just making invocation more complicated, and requires # defining types like cufftComplex. self.cufftExec = { self.cufftType.CUFFT_R2C: self._get_function( "cufftExecR2C", [self.cufftHandle, ctypes.c_void_p, ctypes.c_void_p] ), self.cufftType.CUFFT_C2R: self._get_function( "cufftExecC2R", [self.cufftHandle, ctypes.c_void_p, ctypes.c_void_p] ), self.cufftType.CUFFT_D2Z: self._get_function( "cufftExecD2Z", [self.cufftHandle, ctypes.c_void_p, ctypes.c_void_p] ), self.cufftType.CUFFT_Z2D: self._get_function( "cufftExecZ2D", [self.cufftHandle, ctypes.c_void_p, ctypes.c_void_p] ), self.cufftType.CUFFT_C2C: self._get_function( "cufftExecC2C", [ self.cufftHandle, ctypes.c_void_p, ctypes.c_void_p, self.cufftDirection, ], ), self.cufftType.CUFFT_Z2Z: self._get_function( "cufftExecZ2Z", [ self.cufftHandle, ctypes.c_void_p, ctypes.c_void_p, self.cufftDirection, ], ), }
[docs]class FftTemplate: r"""Operation template for a forward or reverse FFT. The transformation is done over the last N dimensions, with the remaining dimensions for batching multiple arrays to be transformed. Dimensions before the first N must not be padded. This template bakes in more information than most (data shapes), which is due to constraints in CUFFT. The template can specify real->complex, complex->real, or complex->complex. In the last case, the same template can be used to instantiate forward or inverse transforms. Otherwise, real->complex transforms must be forward, and complex->real transforms must be inverse. For real<->complex transforms, the final dimension of the padded shape need only be :math:`\lfloor\frac{L}{2}\rfloor + 1`, where :math:`L` is the last element of `shape`. The transform is unnormalised: performing a forward followed by a reverse transform will scale the result by the number of elements. Parameters ---------- context Context for the operation N Number of dimensions for the transform shape Shape of the data (N or more dimensions). For real->complex or complex->real transformation, this is the size of the real side of the transform. dtype_src : {`np.float32`, `np.float64`, `np.complex64`, `np.complex128`} Data type for input dtype_dest : {`np.float32`, `np.float64`, `np.complex64`, `np.complex128`} Data type for output padded_shape_src Padded shape of the input padded_shape_dest Padded shape of the output tuning Tuning parameters (currently unused) """ def __init__( self, context: AbstractContext, N: int, shape: Tuple[int, ...], dtype_src: DTypeLike, dtype_dest: DTypeLike, padded_shape_src: Tuple[int, ...], padded_shape_dest: Tuple[int, ...], tuning: Optional[Dict[str, Any]] = None, ) -> None: if not isinstance(context, cuda.Context): raise TypeError("Only CUDA contexts are supported") if len(padded_shape_src) != len(shape): raise ValueError("padded_shape_src and shape must have same length") if len(padded_shape_dest) != len(shape): raise ValueError("padded_shape_dest and shape must have same length") if padded_shape_src[:-N] != shape[:-N]: raise ValueError("Source must not be padded on batch dimensions") if padded_shape_dest[:-N] != shape[:-N]: raise ValueError("Destination must not be padded on batch dimensions") if dtype_src == np.float32 and dtype_dest == np.complex64: fft_type = _Cufft.cufftType.CUFFT_R2C elif dtype_src == np.complex64 and dtype_dest == np.float32: fft_type = _Cufft.cufftType.CUFFT_C2R elif dtype_src == np.complex64 and dtype_dest == np.complex64: fft_type = _Cufft.cufftType.CUFFT_C2C elif dtype_src == np.float64 and dtype_dest == np.complex128: fft_type = _Cufft.cufftType.CUFFT_D2Z elif dtype_src == np.complex128 and dtype_dest == np.float64: fft_type = _Cufft.cufftType.CUFFT_Z2D elif dtype_src == np.complex128 and dtype_dest == np.complex128: fft_type = _Cufft.cufftType.CUFFT_Z2Z else: raise ValueError("Invalid combination of dtypes") cufft = _Cufft() self._cufft = cufft self.context: cuda.Context = context self.shape = shape # TODO: the type annotation is necessary with numpy 1.20.1, but # is no longer needed for newer versions. self.dtype_src: np.dtype = np.dtype(dtype_src) self.dtype_dest: np.dtype = np.dtype(dtype_dest) self.padded_shape_src = padded_shape_src self.padded_shape_dest = padded_shape_dest # CUDA 7.0 CUFFT has a bug where kernels are run in the default stream # instead of the requested one, if dimensions are up to 1920. There is # a patch, but there is no query to detect whether it has been # applied. version = ctypes.c_int() cufft.cufftGetVersion(ctypes.byref(version)) self._needs_synchronize_workaround = version.value < 8000 and any( x <= 1920 for x in shape[:N] ) batches = int(np.prod(padded_shape_src[:-N])) with context: arr_type = ctypes.c_longlong * N work_size = ctypes.c_size_t() self._plan = cufft.cufftHandle() cufft.cufftCreate(ctypes.byref(self._plan)) cufft.cufftSetAutoAllocation(self._plan, False) cufft.cufftMakePlanMany64( self._plan, # handle N, # rank arr_type(*shape[-N:]), # n arr_type(*padded_shape_src[-N:]), # inembed 1, # istride int(np.prod(padded_shape_src[-N:])), # idist arr_type(*padded_shape_dest[-N:]), # onembed 1, # ostride int(np.prod(padded_shape_dest[-N:])), # odist fft_type, # type batches, # batch ctypes.byref(work_size), # workSize ) self._work_size = work_size.value self._cufft = cufft self._fft_type = fft_type # The stream and work area are associated with the plan rather than # the execution, so we need to serialise executions. self._lock = threading.RLock()
[docs] def instantiate(self, *args, **kwargs): return Fft(self, *args, **kwargs)
def __del__(self): if hasattr(self, "_plan"): with self.context: self._cufft.cufftDestroy(self._plan)
[docs]class Fft(accel.Operation): """Forward or inverse Fourier transformation. .. rubric:: Slots **src** Input data **dest** Output data **work_area** Scratch area for work. The contents should not be used; it is made available so that it can be aliased with other scratch areas. If the implementation does not need any device memory for scratch space, this slot will not exist. Parameters ---------- template Operation template command_queue Command queue for the operation mode FFT direction allocator Allocator used to allocate unbound slots """ command_queue: cuda.CommandQueue # refine the type for mypy def __init__( self, template: FftTemplate, command_queue: AbstractCommandQueue, mode: FftMode, allocator: Optional[AbstractAllocator] = None, ) -> None: if not isinstance(command_queue, cuda.CommandQueue): raise TypeError("Only CUDA command queues are supported") super().__init__(command_queue, allocator) self.template = template src_shape = list(template.shape) dest_shape = list(template.shape) if template.dtype_src.kind != "c": if mode != FftMode.FORWARD: raise ValueError("R2C transform must use FftMode.FORWARD") dest_shape[-1] = template.shape[-1] // 2 + 1 if template.dtype_dest.kind != "c": if mode != FftMode.INVERSE: raise ValueError("C2R transform must use FftMode.INVERSE") src_shape[-1] = template.shape[-1] // 2 + 1 src_dims = tuple( accel.Dimension(d[0], min_padded_size=d[1], exact=True) for d in zip(src_shape, template.padded_shape_src) ) dest_dims = tuple( accel.Dimension(d[0], min_padded_size=d[1], exact=True) for d in zip(dest_shape, template.padded_shape_dest) ) self.slots["src"] = accel.IOSlot(src_dims, template.dtype_src) self.slots["dest"] = accel.IOSlot(dest_dims, template.dtype_dest) if template._work_size > 0: self.slots["work_area"] = accel.IOSlot((template._work_size,), np.uint8) self.mode = mode def _run(self) -> None: src_buffer = self.buffer("src") dest_buffer = self.buffer("dest") context = self.command_queue.context cufft = self.template._cufft with context, self.template._lock: cufft.cufftSetStream(self.template._plan, self.command_queue._pycuda_stream.handle) if "work_area" in self.slots: work_area_buffer = self.buffer("work_area") cufft.cufftSetWorkArea(self.template._plan, work_area_buffer.buffer.ptr) func = cufft.cufftExec[self.template._fft_type] args = [self.template._plan, src_buffer.buffer.ptr, dest_buffer.buffer.ptr] if len(func.argtypes) == 4: args.append(_Cufft.cufftDirection["CUFFT_" + self.mode.name]) func(*args) if self.template._needs_synchronize_workaround: context._pycuda_context.synchronize()