Source code for gaussfit_rs.fitting

"""
Python API for the gaussfit-rs Rust backend.

Output array layout — 8 elements, dtype float32:

======  ===============  =====================================================
Index   Field            Description
======  ===============  =====================================================
0       amplitude        Fitted peak amplitude (same units as input spectrum)
1       velocity         Line-centre velocity [km/s]
2       sigma            Gaussian width 1-σ [km/s]
3       amplitude_err    1-σ uncertainty on amplitude
4       velocity_err     1-σ uncertainty on velocity [km/s]
5       sigma_err        1-σ uncertainty on sigma [km/s]
6       reduced_chi2     Reduced χ² of best fit
7       flag             Status: FLAG_SUCCESS / FLAG_NO_LOCAL_MAX / FLAG_NO_CONVERGENCE
======  ===============  =====================================================

All fields except *flag* are ``NaN`` when ``flag != FLAG_SUCCESS``.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, NamedTuple

import numpy as np

from ._gaussfit_rs import fit_gaussian_f32 as _fit_gaussian_f32
from ._gaussfit_rs import fit_gaussian_f64 as _fit_gaussian_f64
from ._gaussfit_rs import fit_single_spectrum as _fit_single_spectrum
from ._gaussfit_rs import fit_spectra_batch as _fit_spectra_batch
from ._gaussfit_rs import fit_spectra_batch_guided as _fit_spectra_batch_guided

if TYPE_CHECKING:
    from numpy.typing import NDArray

__all__ = [
    "FLAG_NO_CONVERGENCE",
    "FLAG_NO_LOCAL_MAX",
    "FLAG_SUCCESS",
    "FitResult",
    "fit_gaussian_f32",
    "fit_gaussian_f64",
    "fit_single_spectrum",
    "fit_spectra_batch",
    "fit_spectra_batch_guided",
]


[docs] class FitResult(NamedTuple): """ Named result from a single Gaussian fit. Attributes ---------- amplitude : float Fitted peak amplitude (same units as input spectrum). velocity : float Fitted line-centre velocity [km/s]. sigma : float Fitted Gaussian width (1-sigma) [km/s]. amplitude_err : float 1-sigma uncertainty on *amplitude*. velocity_err : float 1-sigma uncertainty on *velocity* [km/s]. sigma_err : float 1-sigma uncertainty on *sigma* [km/s]. reduced_chi2 : float Reduced chi-squared of the best fit. flag : float Fit status code: :data:`FLAG_SUCCESS`, :data:`FLAG_NO_LOCAL_MAX`, or :data:`FLAG_NO_CONVERGENCE`. Notes ----- All fields except *flag* are ``nan`` when ``flag != FLAG_SUCCESS``. """ amplitude: float velocity: float sigma: float amplitude_err: float velocity_err: float sigma_err: float reduced_chi2: float flag: float
[docs] @classmethod def from_array(cls, arr: NDArray[np.float32]) -> FitResult: """ Construct a :class:`FitResult` from an 8-element fit array. """ return cls(*arr[:8].tolist())
@property def converged(self) -> bool: """ ``True`` if the fit converged (``flag == FLAG_SUCCESS``). """ return self.flag == FLAG_SUCCESS
# Fit-status flag values returned in output[..., 7] FLAG_SUCCESS: float = 0.0 FLAG_NO_LOCAL_MAX: float = 1.0 FLAG_NO_CONVERGENCE: float = 2.0
[docs] def fit_single_spectrum( *, spectrum: NDArray[np.float32], dopp_slit: NDArray[np.float32], spec_noise: NDArray[np.float32], guide_velocity: float, velocity_range: float, npix: int, npix_slack: int, dv: float, width_min: float, n_pixels: int | None = None, amplitude_rel_min: float, amplitude_rel_max: float, width_max: float, width_guess: float, xtol: float = 1.0e-6, ftol: float = 1.0e-6, gtol: float = 1.0e-6, max_iter: int = 2000, ) -> tuple[NDArray[np.float32], tuple[int, int]]: """ Fit a single Gaussian to one spectrum using the Rust backend. The function searches for the brightest peak within ``guide_velocity ± velocity_range`` (km/s), normalises by that peak, then fits a bounded Levenberg-Marquardt Gaussian using the surrounding ``2*npix`` pixel window. Parameters ---------- spectrum: Spectral data, 1-D float32. All three arrays are trimmed to *n_pixels* before processing; pass pre-sliced arrays to avoid any copy overhead. dopp_slit: Doppler velocity at each pixel [km/s], same length as *spectrum*. spec_noise: 1-sigma noise estimate at each pixel (same units as *spectrum*). guide_velocity: Expected line-centre velocity [km/s] used to locate the peak. velocity_range: Half-width of the velocity search window around *guide_velocity* [km/s]. npix: Half-width of the fitting window around the peak (pixels). npix_slack: Extra pixels added to the search window for the peak-finding step. dv: Pixel scale [km/s per pixel]; used to compute velocity bounds. width_min: Minimum allowed Gaussian sigma [km/s]. n_pixels: Number of pixels to use from the start of each array. Defaults to ``len(spectrum)``. Pass this when the arrays are longer than the intended fitting window. amplitude_rel_min: Minimum allowed amplitude relative to the detected peak (e.g. 0.1). amplitude_rel_max: Maximum allowed amplitude relative to the detected peak (e.g. 2.0). width_max: Maximum allowed Gaussian sigma [km/s]. width_guess: Initial guess for the Gaussian sigma [km/s]. xtol: Convergence tolerance on the parameter step size (default 1e-6). ftol: Convergence tolerance on the cost-function change (default 1e-6). gtol: Convergence tolerance on the gradient norm (default 1e-6). max_iter: Maximum LM iterations (default 2000). Returns ------- fit_results : ndarray, shape (8,), float32 ``[amplitude, velocity, sigma, amplitude_err, velocity_err, sigma_err, reduced_chi2, flag]``. See module docstring for details. (i_left, i_right) : tuple[int, int] Pixel indices of the fitting window used (half-open, ``[i_left, i_right)``). """ spectrum = np.ascontiguousarray(spectrum, dtype=np.float32) n_px = int(n_pixels) if n_pixels is not None else len(spectrum) return _fit_single_spectrum( spectrum, np.ascontiguousarray(dopp_slit, dtype=np.float32), np.ascontiguousarray(spec_noise, dtype=np.float32), float(guide_velocity), float(velocity_range), int(npix), int(npix_slack), float(dv), float(width_min), n_px, float(amplitude_rel_min), float(amplitude_rel_max), float(width_max), float(width_guess), float(xtol), float(ftol), float(gtol), int(max_iter), )
[docs] def fit_spectra_batch( *, spectra: NDArray[np.float32], dopp_slit: NDArray[np.float32], spec_noise: NDArray[np.float32], guide_velocity: float, velocity_range: float, npix: int, npix_slack: int, dv: float, width_min: float, n_pixels: int | None = None, amplitude_rel_min: float, amplitude_rel_max: float, width_max: float, width_guess: float, xtol: float = 1.0e-6, ftol: float = 1.0e-6, gtol: float = 1.0e-6, max_iter: int = 2000, ) -> tuple[NDArray[np.float32], NDArray[np.int32]]: """ Fit Gaussians to N spectra in parallel using all available CPU cores. Equivalent to calling :func:`fit_single_spectrum` for each row of *spectra*, but processed concurrently in Rust via Rayon. The GIL is released for the duration of the computation. Parameters ---------- spectra: Spectral data, shape ``(N, M)``, C-contiguous float32. dopp_slit: Doppler velocity grid [km/s], shape ``(n_pixels,)`` — shared across all spectra (same wavelength solution assumed). spec_noise: Per-spaxel noise, shape ``(N, M)``, C-contiguous float32. n_pixels: Number of columns to use from the start of *spectra*. Defaults to the full column count ``spectra.shape[1]``. guide_velocity, velocity_range, npix, npix_slack, dv, width_min, amplitude_rel_min, amplitude_rel_max, width_max, width_guess, xtol, ftol, gtol, max_iter: Same as :func:`fit_single_spectrum`. Returns ------- fit_results : ndarray, shape (N, 8), float32 One result row per input spectrum. Column layout same as :func:`fit_single_spectrum`. indices : ndarray, shape (N, 2), int32 ``[:, 0]`` = i_left, ``[:, 1]`` = i_right for each spectrum. """ spectra = np.ascontiguousarray(spectra, dtype=np.float32) n_px = int(n_pixels) if n_pixels is not None else spectra.shape[1] return _fit_spectra_batch( spectra, np.ascontiguousarray(dopp_slit, dtype=np.float32), np.ascontiguousarray(spec_noise, dtype=np.float32), float(guide_velocity), float(velocity_range), int(npix), int(npix_slack), float(dv), float(width_min), n_px, float(amplitude_rel_min), float(amplitude_rel_max), float(width_max), float(width_guess), float(xtol), float(ftol), float(gtol), int(max_iter), )
def fit_spectra_batch_guided( *, spectra: NDArray[np.float32], dopp_slit: NDArray[np.float32], spec_noise: NDArray[np.float32], guide_velocities: NDArray[np.float32], velocity_range: float, npix: int, npix_slack: int, dv: float, width_min: float, n_pixels: int | None = None, amplitude_rel_min: float, amplitude_rel_max: float, width_max: float, width_guess: float, xtol: float = 1.0e-6, ftol: float = 1.0e-6, gtol: float = 1.0e-6, max_iter: int = 2000, ) -> tuple[NDArray[np.float32], NDArray[np.int32]]: """ Fit Gaussians to N spectra in parallel with one guide velocity per row. Same as :func:`fit_spectra_batch`, except ``guide_velocities`` has shape ``(N,)`` and supplies the expected line-centre velocity for each spectrum. """ spectra = np.ascontiguousarray(spectra, dtype=np.float32) n_px = int(n_pixels) if n_pixels is not None else spectra.shape[1] return _fit_spectra_batch_guided( spectra, np.ascontiguousarray(dopp_slit, dtype=np.float32), np.ascontiguousarray(spec_noise, dtype=np.float32), np.ascontiguousarray(guide_velocities, dtype=np.float32), float(velocity_range), int(npix), int(npix_slack), float(dv), float(width_min), n_px, float(amplitude_rel_min), float(amplitude_rel_max), float(width_max), float(width_guess), float(xtol), float(ftol), float(gtol), int(max_iter), )
[docs] def fit_gaussian_f32( *, x: NDArray[np.float32], y: NDArray[np.float32], error: NDArray[np.float32], initial: NDArray[np.float32], lower_bounds: NDArray[np.float32], upper_bounds: NDArray[np.float32], xtol: float = 1.0e-6, ftol: float = 1.0e-6, gtol: float = 1.0e-6, max_iter: int = 2000, ) -> NDArray[np.float32]: """ Fit a bounded Gaussian to (x, y ± error) data using the Rust backend. Uses a Levenberg-Marquardt solver with box constraints on all three parameters (amplitude, mean, sigma). Parameters ---------- x: Independent variable, shape ``(N,)``, float32. Must have at least 3 elements. y: Observed values, shape ``(N,)``, float32. error: 1-sigma uncertainties on *y*, shape ``(N,)``, float32. All values must be finite and non-zero. initial: Starting guess ``[amplitude, mean, sigma]``, shape ``(3,)``. lower_bounds: Lower bounds ``[amp_min, mean_min, sigma_min]``, shape ``(3,)``. upper_bounds: Upper bounds ``[amp_max, mean_max, sigma_max]``, shape ``(3,)``. xtol: Convergence tolerance on the parameter step size. ftol: Convergence tolerance on the cost-function change. gtol: Convergence tolerance on the gradient norm. max_iter: Maximum number of outer LM iterations. Returns ------- ndarray, shape (8,), float32 ``[amplitude, mean, sigma, amplitude_err, mean_err, sigma_err, reduced_chi2, flag]``. *flag* is :data:`FLAG_SUCCESS` (0) on convergence or :data:`FLAG_NO_CONVERGENCE` (2) otherwise. """ return _fit_gaussian_f32( np.ascontiguousarray(x, dtype=np.float32), np.ascontiguousarray(y, dtype=np.float32), np.ascontiguousarray(error, dtype=np.float32), np.ascontiguousarray(initial, dtype=np.float32), np.ascontiguousarray(lower_bounds, dtype=np.float32), np.ascontiguousarray(upper_bounds, dtype=np.float32), float(xtol), float(ftol), float(gtol), int(max_iter), )
[docs] def fit_gaussian_f64( *, x: NDArray[np.float64], y: NDArray[np.float64], error: NDArray[np.float64], initial: NDArray[np.float64], lower_bounds: NDArray[np.float64], upper_bounds: NDArray[np.float64], xtol: float = 1.0e-10, ftol: float = 1.0e-10, gtol: float = 1.0e-10, max_iter: int = 2000, ) -> NDArray[np.float64]: """ Fit a bounded Gaussian to (x, y ± error) data in double precision. Identical to :func:`fit_gaussian_f32` but operates on ``float64`` arrays throughout. Use when the data span a wide dynamic range or when f32 rounding would be significant. Parameters and return value have the same structure as :func:`fit_gaussian_f32`, but all arrays are ``float64``. Default tolerances are tighter (1e-10) to exploit the extra precision. """ return _fit_gaussian_f64( np.ascontiguousarray(x, dtype=np.float64), np.ascontiguousarray(y, dtype=np.float64), np.ascontiguousarray(error, dtype=np.float64), np.ascontiguousarray(initial, dtype=np.float64), np.ascontiguousarray(lower_bounds, dtype=np.float64), np.ascontiguousarray(upper_bounds, dtype=np.float64), float(xtol), float(ftol), float(gtol), int(max_iter), )