Source code for mioXpektron.denoise.denoise_main

"""
ToF-SIMS 1D denoising & smoothing utilities
------------------------------------------
A focused toolkit for 1D spectra (e.g., ToF‑SIMS / MS) that implements
wavelet‑shrinkage denoising and classical smoothing filters, plus
resampling to a uniform grid when needed. The design emphasizes:

• Peak preservation: track height, FWHM, area, and m/z boundaries.
• Noise reduction: robust σ̂ via MAD (baseline regions) and support for
  variance‑stabilizing transforms (Anscombe) for count‑like noise.
• Practical ergonomics: optional cycle‑spinning for translation
  invariance, TIC preservation, and shape‑preserving interpolation.

This module provides the *primitive operations* used by the higher‑level
selection & evaluation code.
"""

from __future__ import annotations

import warnings
from typing import Optional, Literal, Tuple, Dict, List, Any
import numpy as np
import pywt
from scipy import signal, ndimage
from scipy.interpolate import PchipInterpolator
from scipy.integrate import trapezoid


# =============================== helpers ================================== #

def _mad_sigma(x: np.ndarray) -> float:
    """Robust noise estimate via MAD/0.6745 (median absolute deviation)."""
    x = np.asarray(x)
    if x.size == 0:
        return 0.0
    med = np.median(x)
    return float(np.median(np.abs(x - med)) / 0.67448975)


def _sure_threshold_fast(
        coefs: np.ndarray,
        sigma: float
        ) -> float:
    """
    Soft-threshold that (approximately) minimizes SURE for Gaussian noise.

    Vectorized O(n log n) evaluation over all unique |w|-knots:
        SURE(t) = n*sigma^2 + Σ min(w_i^2, t^2) - 2*sigma^2 * #{|w_i| <= t}

    Notes
    -----
    - Equivalent to Donoho-Johnstone SureShrink evaluation at the order
      statistics of |w|. We include t=0 implicitly via k=-1 case if desired,
      but practically thresholds >0 dominate.
    """
    y = np.asarray(coefs, dtype=np.float64)
    n = y.size
    if n == 0 or not np.isfinite(sigma) or sigma <= 0:
        return 0.0

    a = np.abs(y)
    a = a[np.isfinite(a)]
    if a.size == 0:
        return 0.0

    a.sort()                                 # ascending |w|
    s2 = float(sigma) * float(sigma)
    a2 = a * a

    # 1-based cumulative sum for easy slicing
    csum = np.empty(a2.size + 1, dtype=np.float64)
    csum[0] = 0.0
    np.cumsum(a2, out=csum[1:])

    # Evaluate SURE at unique knots t = a[k] using RIGHT boundary (counts <= t)
    uniq = np.unique(a)
    m = np.searchsorted(a, uniq, side="right")      # m_j = #{|w_i| <= uniq[j]}
    t2 = uniq * uniq

    # sum_i min(a_i^2, t^2) = sum_{i<=m} a_i^2 + (n - m) * t^2
    sum_min = csum[m] + (n - m) * t2
    risk = n * s2 + sum_min - 2.0 * s2 * m

    j = int(np.argmin(risk))
    return float(uniq[j])
    

def _sure_threshold_optimized(
        coefs: np.ndarray,
        sigma: float,
        max_candidates: int = 128,
        refine_window: int = 20
        ) -> float:
    """
    Optimized SURE threshold with candidate subsampling, refinement, and inclusion of t=0.
    Addresses approximation risks by refining around the best coarse candidate.
    Biases candidate selection toward larger thresholds.
    """
    y = np.asarray(coefs, dtype=np.float64)
    n = y.size
    if n == 0 or sigma <= 0:
        return 0.0
    
    ay = np.abs(y)
    ay = ay[np.isfinite(ay)]
    if ay.size == 0:
        return 0.0
    
    ay_sorted = np.sort(ay)  # ascending
    s2 = sigma * sigma
    
    # Precompute cumulative sum of ay**2
    ay2_cumsum = np.cumsum(ay_sorted ** 2)
    csum = np.r_[0.0, ay2_cumsum]  # prefixed for csum[m] = sum of first m ay**2
    
    # Adaptive candidate selection: bias toward larger t with power law
    n_candidates = min(max_candidates, max(32, n // 4))
    frac = np.linspace(0, 1, n_candidates) ** 1.5  # bias to higher
    indices = np.unique(np.round(frac * (len(ay_sorted) - 1)).astype(int))
    candidates = np.unique(ay_sorted[indices])  # unique to avoid dups
    
    # Include t=0 explicitly if not already (though often ay_sorted[0] >=0)
    if candidates[0] > 0:
        candidates = np.r_[0.0, candidates]
    
    # Function to compute risk for a given t
    def compute_risk(t: float) -> float:
        """Evaluate the SURE risk at threshold ``t`` for the current spectrum."""
        if t == 0:
            m0 = np.searchsorted(ay_sorted, 0.0, side='right')  # count(|w_i| <= 0)
            # For t=0: sum_min = 0, risk = n*s2 - 2*s2*m0
            return n * s2 - 2.0 * s2 * m0
        m = np.searchsorted(ay_sorted, t, side='right')  # # <= t
        t2 = t * t
        sum_min = csum[m] + (n - m) * t2
        risk = n * s2 + sum_min - 2.0 * s2 * m
        return risk
    
    # Coarse search: find best among candidates
    risks = np.array([compute_risk(t) for t in candidates])
    j = np.argmin(risks)
    best_t_coarse = candidates[j]
    # best_risk_coarse = risks[j]
    
    # Refinement: evaluate dense around the best coarse t
    m_coarse = np.searchsorted(ay_sorted, best_t_coarse, side='right')
    start = max(0, m_coarse - refine_window)
    end = min(n, m_coarse + refine_window + 1)
    fine_candidates = np.unique(ay_sorted[start:end])  # unique in window
    
    # Compute risks for fine candidates
    fine_risks = np.array([compute_risk(t) for t in fine_candidates])
    k = np.argmin(fine_risks)
    best_t = fine_candidates[k]
    
    return float(best_t)


def _bayes_threshold(
        detail: np.ndarray,
        sigma: float
        ) -> float:
    """
    BayesShrink threshold: t = sigma^2 / sqrt(max(var(detail) - sigma^2, 0)).
    Safer fallback: Universal threshold if var_x <= 0.
    """
    d = np.asarray(detail, dtype=np.float64)
    n = d.size
    if n == 0 or sigma <= 0 or not np.isfinite(sigma):
        return 0.0
    var_w = float(np.var(d))
    var_x = max(var_w - sigma * sigma, 0.0)
    if var_x <= 0.0:
        return float(sigma * np.sqrt(2.0 * np.log(max(n, 2))))
    return float((sigma * sigma) / np.sqrt(var_x))


def _circular_shift(x: np.ndarray, k: int) -> np.ndarray:
    """Circularly shift a 1D array to the right by k samples (mod N)."""
    return np.roll(x, int(k))


def _inverse_circular_shift(x: np.ndarray, k: int) -> np.ndarray:
    """Inverse circular shift for `_circular_shift` (left by k samples)."""
    return np.roll(x, -int(k))


def _resample_to_uniform(
    x: np.ndarray,
    y: np.ndarray,
    target_dx: Optional[float] = None,
    *,
    forward_kind: Literal["pchip", "linear"] = "pchip",
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Interpolate (x, y) to a uniform grid (xu, yu) using shape-preserving PCHIP
    (default) or linear interpolation, and return (xu, yu, x_in, idx_in).

    Parameters
    ----------
    x : np.ndarray
        Original, possibly nonuniform x-axis values.
    y : np.ndarray
        Signal values aligned to x.
    target_dx : float, optional
        Desired spacing for the uniform grid. If None, inferred from the median
        positive spacing in x (falls back to mean of diffs if necessary).
    forward_kind : {"pchip", "linear"}
        Interpolant for the forward resampling step; PCHIP preserves peak shapes.

    Returns
    -------
    xu : np.ndarray
        Uniform x-grid spanning [min(x), max(x)].
    yu : np.ndarray
        Resampled y on the uniform grid xu.
    x_in : np.ndarray
        Finite x samples from the caller, in the caller's original order.
    idx_in : np.ndarray
        Indices of finite samples relative to the original arrays.

    Notes
    -----
    Internally, (x, y) are sorted by x to build the forward interpolant. Mapping
    back to the caller's order is done later by evaluating a PCHIP defined on
    (xu, out_u) at x_in; this preserves the caller's order and avoids silent
    reordering.
    """
    x = np.asarray(x, dtype=np.float64)
    y = np.asarray(y, dtype=np.float64)

    mask = np.isfinite(x) & np.isfinite(y)
    idx_in = np.nonzero(mask)[0]
    x_in, y_in = x[mask], y[mask]

    if x_in.size < 2:
        return x_in, y_in, x_in, idx_in

    # sort by x to create the forward interpolant
    order = np.argsort(x_in)
    xs, ys = x_in[order], y_in[order]

    diffs = np.diff(xs)
    pos = diffs[np.isfinite(diffs) & (diffs > 0)]
    if target_dx is None or not np.isfinite(target_dx) or target_dx <= 0:
        target_dx = float(np.median(pos)) if pos.size else float(np.mean(diffs))

    n = max(2, int(np.round((xs[-1] - xs[0]) / target_dx)) + 1)
    xu = np.linspace(xs[0], xs[-1], n)

    if forward_kind == "pchip":
        fwd = PchipInterpolator(xs, ys, extrapolate=True)
        yu = fwd(xu)
    else:
        yu = np.interp(xu, xs, ys, left=ys[0], right=ys[-1])

    return xu, np.asarray(yu, dtype=np.float64), x_in, idx_in


def _anscombe_forward(y: np.ndarray) -> np.ndarray:
    """
    Classical Anscombe VST for Poisson counts:
        z = 2 * sqrt(y + 3/8)
    Requires y >= 0.
    """
    y = np.asarray(y, dtype=np.float64)
    return 2.0 * np.sqrt(y + 3.0 / 8.0)


def _anscombe_inverse_unbiased(z: np.ndarray) -> np.ndarray:
    """
    Unbiased inverse of the classical Anscombe transform (Mäkitalo & Foi).
    Accurate for small counts; asymptotic series with safeguards.
    This is the consistent inverse for the classical Anscombe forward transform
    under a pure Poisson noise model. Reference: Mäkitalo & Foi, IEEE TIP 2013.
    """
    z = np.asarray(z, dtype=np.float64)
    # Prevent division by zero in series
    z2 = np.maximum(z, 1e-12)
    # Asymptotic series (pure Poisson case)
    # I(z) ≈ (z/2)^2 - 1/8 + 1/(8 z^2) - 5/(128 z^4) + 7/(512 z^6) - 21/(1024 z^8)
    inv = (z2 * 0.5) ** 2 - 0.125 \
        + 1.0 / (8.0 * z2 ** 2) \
        - 5.0 / (128.0 * z2 ** 4) \
        + 7.0 / (512.0 * z2 ** 6) \
        - 21.0 / (1024.0 * z2 ** 8)
    # Clip tiny negatives caused by numerical error
    return np.maximum(inv, 0.0)


def _prepare_anscombe_input(
    y: np.ndarray,
    *,
    negative_policy: Literal["warn_clip", "clip", "raise"] = "warn_clip",
) -> np.ndarray:
    """Validate non-negative input for the classical Anscombe transform."""
    y = np.asarray(y, dtype=np.float64)
    neg_mask = np.isfinite(y) & (y < 0.0)
    if not np.any(neg_mask):
        return y

    n_neg = int(np.count_nonzero(neg_mask))
    min_neg = float(np.min(y[neg_mask]))
    message = (
        "Classical Anscombe VST assumes non-negative Poisson-like input, "
        f"but found {n_neg} negative values (min={min_neg:.6g})."
    )

    if negative_policy == "raise":
        raise ValueError(message + " Use variance_stabilize='none' or preprocess the signal before denoising.")
    if negative_policy not in {"warn_clip", "clip"}:
        raise ValueError("anscombe_negative_policy must be 'warn_clip', 'clip', or 'raise'")
    if negative_policy == "warn_clip":
        warnings.warn(
            message + " Clipping negatives to zero before the classical Anscombe transform.",
            RuntimeWarning,
            stacklevel=3,
        )

    y = y.copy()
    y[neg_mask] = 0.0
    return y


# =========================== wavelet denoising ============================ #

[docs] def wavelet_denoise( intensities: np.ndarray, *, wavelet: Literal["db4", "db8", "sym5", "sym8", "coif2", "coif3"] = "sym8", level: Optional[int] = None, threshold_mode: Literal["soft", "hard"] = "soft", threshold_strategy: Literal["universal", "bayes", "sure", "sure_opt"] = "universal", sigma: Optional[float] = None, sigma_strategy: Literal["per_level", "global"] = "per_level", variance_stabilize: Literal["none", "anscombe"] = "none", anscombe_negative_policy: Literal["warn_clip", "clip", "raise"] = "warn_clip", cycle_spins: Literal[0, 4, 8, 16, 32] = 0, pywt_mode: str = "periodization", clip_nonnegative: bool = True, preserve_tic: bool = False, ) -> np.ndarray: """ Wavelet-shrinkage denoising for 1D ToF-SIMS/MS data, with optional translation invariance (cycle-spinning) and TIC preservation. Parameters ---------- intensities : array-like 1D intensities (finite values recommended). wavelet : {"db4","db8","sym5","sym8","coif2","coif3"} Discrete wavelet family used for the DWT. level : int or None DWT level; if None, chosen conservatively via `pywt.dwt_max_level` (capped at 6). threshold_mode : {"soft","hard"} Shrinkage mode (soft is default and recommended for denoising). threshold_strategy : {"universal","bayes","sure","sure_opt"} Threshold selection per detail level: • "universal": √(2 log N) scaling (Donoho–Johnstone) • "bayes": BayesShrink (subband variance model) • "sure": SURE-threshold via full knot evaluation (fast, exact at knots) • "sure_opt": SURE with biased candidate subsampling + local refinement (faster) sigma : float or None If provided, used as a *global* noise std at all levels (overrides sigma_strategy). sigma_strategy : {"per_level","global"} How to estimate σ when `sigma` is None. • "per_level": σ_j via MAD on each detail subband (robust; recommended) • "global": a single σ via MAD on the finest detail of the unshifted input variance_stabilize : {"none","anscombe"} Apply a variance-stabilizing transform prior to denoising. ``"anscombe"`` uses the classical Anscombe forward transform with the Mäkitalo-Foi unbiased inverse, appropriate for non-negative Poisson-like input. anscombe_negative_policy : {"warn_clip","clip","raise"} How to handle negative values before the classical Anscombe transform. ``"warn_clip"`` preserves backward compatibility but emits a warning; ``"raise"`` is recommended when negative values are scientifically meaningful residuals rather than impossible counts. cycle_spins : {0,4,8,16,32} Number of circular shifts to average (0 disables; try 8–16 for higher SNR if compute allows). If cycle_spins ≥ N on very long vectors, the implementation caps the number of shifts at 2048 for performance. pywt_mode : str Wavelet signal extension mode. Common choices: 'periodization' (reduces edge artifacts), 'symmetric', 'reflect'. See PyWavelets docs for full list. clip_nonnegative : bool Clip negative outputs to zero (recommended for intensities). preserve_tic : bool If True, rescale output so sum(out) == sum(input) (guarded to avoid division by zero). Returns ------- np.ndarray Denoised intensities, same length as input. Raises ------ ValueError If `intensities` is not 1D, or an unknown `threshold_strategy` is provided. See Also -------- noise_filtering : High-level wrapper supporting classical smoothers and resampling. """ y0 = np.asarray(intensities, dtype=np.float64) if y0.ndim != 1: raise ValueError("'intensities' must be a 1D array") if anscombe_negative_policy not in {"warn_clip", "clip", "raise"}: raise ValueError("anscombe_negative_policy must be 'warn_clip', 'clip', or 'raise'") # Precompute global sigma once (unshifted signal) if requested and not provided. # If requested, precompute a single global σ on the *unshifted* input using # the finest detail subband (robust to nonstationary low-freq content). precomputed_global_sigma: Optional[float] = None if sigma is None and sigma_strategy == "global": w = pywt.Wavelet(wavelet) lvl0 = level if level is not None else max(1, min(6, pywt.dwt_max_level(y0.size, w.dec_len))) y0_finite = np.where(np.isfinite(y0), y0, 0.0) coeffs0 = pywt.wavedec(y0_finite, wavelet=wavelet, level=lvl0, mode=pywt_mode) # cD1 is the finest detail band in PyWavelets: coeffs = [cA_n, cD_n, ..., cD_1] finest = coeffs0[-1] if len(coeffs0) >= 2 else coeffs0[0] precomputed_global_sigma = _mad_sigma(finest) def _preprocess(y: np.ndarray) -> np.ndarray: """Replace NaNs and optionally apply the forward variance transform.""" y = np.where(np.isfinite(y), y, 0.0) if variance_stabilize == "anscombe": return _anscombe_forward( _prepare_anscombe_input(y, negative_policy=anscombe_negative_policy) ) return y def _postprocess(y: np.ndarray, y_raw: np.ndarray) -> np.ndarray: """Undo variance-stabilizing transforms and apply TIC/clip policies.""" # Inverse VST first (nonlinear), then TIC & clipping if variance_stabilize == "anscombe": y = _anscombe_inverse_unbiased(y) # TIC scaling relative to the original raw input if preserve_tic: s_in = np.nansum(np.where(np.isfinite(y_raw), y_raw, 0.0)) s_out = np.nansum(np.where(np.isfinite(y), y, 0.0)) if np.isfinite(s_in) and np.isfinite(s_out) and s_out > 0: y = y * (s_in / s_out) if clip_nonnegative: y = np.maximum(y, 0.0) return y def _single_pass(y: np.ndarray) -> np.ndarray: """Denoise a single (possibly shifted) realization of the signal.""" y_v = _preprocess(y) w_ = pywt.Wavelet(wavelet) # Choose a conservative decomposition level (cap at 6 to avoid over-decomposition on long signals) lvl = level if level is not None else max(1, min(6, pywt.dwt_max_level(y_v.size, w_.dec_len))) coeffs = pywt.wavedec(y_v, wavelet=wavelet, level=lvl, mode=pywt_mode) # decide sigma policy (your existing logic) if sigma is not None: global_sigma = float(sigma) elif sigma_strategy == "global": global_sigma = float(precomputed_global_sigma) if precomputed_global_sigma is not None else 0.0 else: global_sigma = None new_coeffs = [coeffs[0]] for c in coeffs[1:]: c = np.where(np.isfinite(c), c, 0.0) sigma_j = _mad_sigma(c) if global_sigma is None else global_sigma if sigma_j <= 0 or not np.isfinite(sigma_j): new_coeffs.append(c) continue if threshold_strategy == "universal": t = sigma_j * np.sqrt(2.0 * np.log(max(y_v.size, 2))) elif threshold_strategy == "bayes": t = _bayes_threshold(c, sigma_j) elif threshold_strategy == "sure": t = _sure_threshold_fast(c, sigma_j) elif threshold_strategy == "sure_opt": t = _sure_threshold_optimized(c, sigma_j) else: raise ValueError(f"Unknown threshold_strategy: {threshold_strategy}") with warnings.catch_warnings(): warnings.simplefilter("ignore") c_thr = pywt.threshold(c, value=float(t), mode=threshold_mode) new_coeffs.append(np.where(np.isfinite(c_thr), c_thr, 0.0)) out_v = pywt.waverec(new_coeffs, wavelet, mode=pywt_mode)[: y_v.size] return out_v if cycle_spins and cycle_spins > 0 and y0.size > 8: acc = np.zeros_like(y0, dtype=float) N = y0.size # Use all shifts if spins >= N; otherwise sample evenly-spaced shifts across [0, N) # Guard: on very long vectors, using all shifts can be prohibitively expensive. # If spins >= N and N is large, cap to a reasonable maximum of evenly spaced shifts. max_spins = 2048 effective_spins = cycle_spins if cycle_spins >= N: effective_spins = min(N, max_spins) if N > max_spins: warnings.warn( f"cycle_spins >= N ({cycle_spins} >= {N}); capping to " f"{max_spins} evenly spaced shifts for performance. " f"This may reduce denoising accuracy compared to full " f"cycle-spinning.", UserWarning, stacklevel=2, ) elif cycle_spins > max_spins: effective_spins = max_spins warnings.warn( f"cycle_spins={cycle_spins} exceeds cap of {max_spins}; " f"using {max_spins} evenly spaced shifts.", UserWarning, stacklevel=2, ) if effective_spins >= N: steps = np.arange(N, dtype=int) else: steps = np.unique(np.round(np.linspace(0, N - 1, num=effective_spins)).astype(int)) for k in steps: acc += _inverse_circular_shift(_single_pass(_circular_shift(y0, k)), k) denoised_v = acc / float(steps.size) else: denoised_v = _single_pass(y0) # Inverse VST, TIC, clipping denoised = _postprocess(denoised_v, y0) denoised = np.where(np.isfinite(denoised), denoised, 0.0) return denoised
# ==================== traditional smoothing + resampling =================== #
[docs] def noise_filtering( intensities: np.ndarray, *, method: Literal["savitzky_golay", "gaussian", "median", "wavelet", "none"] = "wavelet", # Savitzky–Golay / median / gaussian parameters window_length: int = 15, polyorder: int = 3, deriv: int = 0, gauss_sigma_pts: Optional[float] = None, gaussian_order: int = 0, # Wavelet parameters wavelet: Literal["db4", "db8", "sym5", "sym8", "coif2", "coif3"] = "sym8", level: Optional[int] = None, threshold_strategy: Literal["universal", "bayes", "sure", "sure_opt"] = "universal", threshold_mode: Literal["soft", "hard"] = "soft", sigma: Optional[float] = None, sigma_strategy: Literal["per_level", "global"] = "per_level", variance_stabilize: Literal["none", "anscombe"] = "none", anscombe_negative_policy: Literal["warn_clip", "clip", "raise"] = "warn_clip", cycle_spins: Literal[0, 4, 8, 16, 32] = 0, pywt_mode: str = "periodization", # Shared/output behavior clip_nonnegative: bool = True, preserve_tic: bool = False, # Sampling & resampling x: Optional[np.ndarray] = None, resample_to_uniform: bool = False, target_dx: Optional[float] = None, forward_interp: Literal["pchip", "linear"] = "pchip", ) -> np.ndarray: """ Apply 1D denoising/smoothing to ToF-SIMS spectra. Notes ----- - Savitzky–Golay / Gaussian / Median assume ~uniform sampling. If your m/z grid is nonuniform, pass `x` and set `resample_to_uniform=True`. The wavelet path can also resample when `resample_to_uniform=True`. - Wavelet shrinkage preserves narrow peaks; consider Bayes/SURE and cycle-spins. Parameters ---------- intensities : np.ndarray 1D intensity array. method : {'savitzky_golay','gaussian','median','wavelet','none'} window_length : int Odd window for Savitzky–Golay or median; will be coerced to odd >=3. polyorder : int For Savitzky–Golay, 0 ≤ polyorder < window_length. deriv : int For Savitzky–Golay, derivative order (0 = smoothing; 1/2/... compute derivatives). Requires `polyorder >= deriv`. gauss_sigma_pts : float or None If provided, overrides default sigma = window_length/6 for Gaussian filter. gaussian_order : int For Gaussian filtering, derivative order for `ndimage.gaussian_filter1d`. 0 = smoothing; >0 computes derivatives. wavelet, level, threshold_strategy, threshold_mode, sigma, cycle_spins, pywt_mode Passed to wavelet processing (see `wavelet_denoise`). sigma_strategy : {"per_level","global"} Strategy if `sigma` is None. "per_level" = σ_j via MAD on each detail subband; "global" = one σ via MAD on the finest detail of the unshifted input. variance_stabilize : {"none","anscombe"} Apply variance‑stabilizing transform before denoising. ``"anscombe"`` uses the classical Anscombe transform for non-negative Poisson-like input. anscombe_negative_policy : {"warn_clip","clip","raise"} Handling policy for negative values before the classical Anscombe transform. clip_nonnegative, preserve_tic Output behaviors. x : np.ndarray or None Optional m/z (or channel) axis aligned with intensities. resample_to_uniform : bool If True and x is provided, internally resample to a uniform grid and back. target_dx : float or None Target spacing for the uniform grid (if None, inferred). forward_interp : {'pchip','linear'} Interpolant used when building the uniform-grid signal (PCHIP recommended). Returns ------- np.ndarray Filtered intensities aligned to the input grid/order. Raises ------ ValueError If `intensities` or `x` have mismatched shapes or if `intensities` is not 1D. If Savitzky–Golay has `polyorder < deriv` after clamping, or if `method` is unknown. See Also -------- wavelet_denoise : Core wavelet denoising routine used when `method="wavelet"`. """ y = np.asarray(intensities, dtype=np.float64) if y.ndim != 1: raise ValueError("'intensities' must be a 1D array") if x is not None: x = np.asarray(x, dtype=np.float64) if x.shape != y.shape: raise ValueError("x and intensities must have the same shape") # Work only on finite entries; reconstruct full-length output at the end if x is None: mask = np.isfinite(y) else: mask = np.isfinite(y) & np.isfinite(x) idx = np.nonzero(mask)[0] if idx.size == 0: out = np.zeros_like(y, dtype=float) return out y_work = y[mask] x_work = x[mask] if x is not None else None def _map_back(out_in_: np.ndarray) -> np.ndarray: """Restore filtered samples onto the original indexing with safeguards.""" # Reconstruct a full-length output aligned to the original indices; apply optional clipping and TIC scaling out_full = np.zeros_like(y, dtype=float) out_full[mask] = out_in_ if clip_nonnegative: out_full = np.maximum(out_full, 0.0) if preserve_tic: s_in = y_work.sum() s_out = out_in_.sum() if np.isfinite(s_in) and np.isfinite(s_out) and s_out > 0: scale = s_in / s_out out_full[mask] *= scale return out_full # Optional resampling branch (also available for wavelet) use_uniform = (x_work is not None) and resample_to_uniform and method in { "savitzky_golay", "gaussian", "median", "wavelet" } if method == "none": return _map_back(y_work.copy()) if use_uniform: xu, yu, x_in, idx_in = _resample_to_uniform( x_work, y_work, target_dx=target_dx, forward_kind=forward_interp ) # Apply the chosen filter on the uniform grid if method == "savitzky_golay": wl = max(3, int(window_length)) if wl % 2 == 0: wl += 1 poly = int(polyorder) poly = min(max(0, poly), wl - 1) d = max(0, int(deriv)) if poly < d: raise ValueError("Savitzky–Golay requires polyorder >= deriv") out_u = signal.savgol_filter(yu, window_length=wl, polyorder=poly, deriv=d, mode="interp") elif method == "gaussian": sigma_pts = float(gauss_sigma_pts) if gauss_sigma_pts is not None else max(0.5, float(window_length) / 6.0) # heuristic ≈ window/6 gorder = max(0, int(gaussian_order)) out_u = ndimage.gaussian_filter1d(yu, sigma=sigma_pts, order=gorder, mode="nearest") elif method == "median": wl = max(3, int(window_length)) if wl % 2 == 0: wl += 1 out_u = signal.medfilt(yu, kernel_size=wl) elif method == "wavelet": out_u = wavelet_denoise( yu, wavelet=wavelet, level=level, threshold_mode=threshold_mode, threshold_strategy=threshold_strategy, sigma=sigma, sigma_strategy=sigma_strategy, variance_stabilize=variance_stabilize, anscombe_negative_policy=anscombe_negative_policy, cycle_spins=cycle_spins, pywt_mode=pywt_mode, clip_nonnegative=False, # clip after mapping back preserve_tic=False, # preserve after mapping back ) else: raise ValueError(f"Unknown method: {method}") # Map back to the caller's original x ordering (shape-preserving) back_interp = PchipInterpolator(xu, out_u, extrapolate=True) out_in = back_interp(x_in) return _map_back(np.asarray(out_in, dtype=np.float64)) # No resampling branch (operate directly on y_work) if method == "savitzky_golay": wl = max(3, int(window_length)) if wl % 2 == 0: wl += 1 poly = int(polyorder) poly = min(max(0, poly), wl - 1) d = max(0, int(deriv)) if poly < d: raise ValueError("Savitzky–Golay requires polyorder >= deriv") out_in = signal.savgol_filter(y_work, window_length=wl, polyorder=poly, deriv=d, mode="interp") elif method == "gaussian": sigma_pts = float(gauss_sigma_pts) if gauss_sigma_pts is not None else max(0.5, float(window_length) / 6.0) # heuristic ≈ window/6 gorder = max(0, int(gaussian_order)) out_in = ndimage.gaussian_filter1d(y_work, sigma=sigma_pts, order=gorder, mode="nearest") elif method == "median": wl = max(3, int(window_length)) if wl % 2 == 0: wl += 1 out_in = signal.medfilt(y_work, kernel_size=wl) elif method == "wavelet": out_in = wavelet_denoise( y_work, wavelet=wavelet, level=level, threshold_mode=threshold_mode, threshold_strategy=threshold_strategy, sigma=sigma, sigma_strategy=sigma_strategy, variance_stabilize=variance_stabilize, anscombe_negative_policy=anscombe_negative_policy, cycle_spins=cycle_spins, pywt_mode=pywt_mode, clip_nonnegative=True, preserve_tic=False, ) else: raise ValueError(f"Unknown method: {method}") return _map_back(np.asarray(out_in, dtype=np.float64))