Source code for mioXpektron.denoise.denoise_select

"""
ToF-SIMS 1D denoising & smoothing utilities
-------------------------------------------------
This module provides a battery of denoising and smoothing methods for 1D ToF-SIMS spectra
and a *noise-aware* evaluation framework that selects methods based on both peak preservation
and explicit noise reduction criteria. It supports:

• Wavelet shrinkage (universal, SURE, Bayes, etc.) with optional variance-stabilizing transform (Anscombe).
• Classical smoothers (Savitzky–Golay, Gaussian, Median, and a no-op baseline).
• Robust, peak-centric measurements (height, FWHM, area, m/z shift) before/after denoising.
• Explicit noise quantification:
    – Global background σ̂ via MAD on baseline regions (outside expanded FWHM windows)
    – Local σ̂ around each peak (flanking bands) and ΔSNR per peak (in dB)
    – High-frequency residual power via PSD (Welch by default), integrated above a cutoff
• Parallel execution (threads or processes) and optional progress bars.
• Ranking with tunable weights and **hard constraints** to forbid methods that do not denoise.
• Convenience plotting (Pareto: ΔSNR vs |%height|) and multi-window evaluation helpers.

Design notes
------------
• Baseline regions are defined by excluding each reference peak’s FWHM window expanded by `baseline_expand`.
• Local noise is estimated in flanking bands at [1.5W, 3W] on each side of a peak by default.
• PSD is computed on *uniformly resampled* baseline segments to make frequency analysis well-defined.
• Threads are the recommended backend because most heavy NumPy/SciPy routines release the GIL.
"""

from __future__ import annotations

try:
    import polars as pl  # type: ignore
    _POLARS_AVAILABLE = True
except Exception:  # pragma: no cover
    pl = None  # type: ignore
    _POLARS_AVAILABLE = False

import os
import pandas as pd
from typing import Optional, Literal, Tuple, Dict, List, Any
from typing import Iterable
from dataclasses import dataclass
import numpy as np
from numpy.typing import NDArray
from scipy import signal
from scipy.integrate import trapezoid

from .denoise_main import noise_filtering
from pathlib import Path

OUTPUT_DIR = Path("output_files")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# -- Parallel/Progress helpers --
def _iter_with_progress(iterable: Iterable, total: int, progress: bool, desc: str):
    """Yield items, optionally wrapped with tqdm progress bar (no hard dep)."""
    if not progress:
        for x in iterable:
            yield x
        return
    try:
        from tqdm.auto import tqdm as _tqdm
        for x in _tqdm(iterable, total=total, desc=desc):
            yield x
    except Exception:
        # Fallback: no tqdm available
        for x in iterable:
            yield x



# --- Robust noise and SNR helpers ---

def _robust_sigma(a: np.ndarray) -> float:
    """Robust noise estimate via MAD (scaled to sigma). Returns NaN if empty."""
    a = np.asarray(a, dtype=np.float64)
    a = a[np.isfinite(a)]
    if a.size == 0:
        return np.nan
    med = np.median(a)
    mad = np.median(np.abs(a - med))
    return 1.4826 * mad


def _compute_baseline_mask(x: np.ndarray, ref_meas: List["PeakMeasurement"], expand: float = 2.0) -> NDArray[np.bool_]:
    """Boolean mask of baseline regions: exclude peak windows expanded by `expand`×width.
    True = baseline (non-peak) indices; False = inside expanded peak windows.
    """
    # Collect valid exclusion windows
    bounds = []
    for r in ref_meas:
        w = (r.mz_right - r.mz_left)
        if not np.isfinite(w) or w <= 0:
            continue
        half = 0.5 * expand * w
        bounds.append((r.mz_left - half, r.mz_right + half))

    if not bounds:
        return np.ones(x.size, dtype=bool)

    # Vectorized: stack all bounds and use broadcasting
    bounds_arr = np.array(bounds)  # (n_peaks, 2)
    # Check if x falls inside any exclusion window
    in_any = np.any(
        (x[np.newaxis, :] >= bounds_arr[:, 0:1]) &
        (x[np.newaxis, :] <= bounds_arr[:, 1:2]),
        axis=0
    )
    return ~in_any


def _local_noise_sigma(x: np.ndarray, y: np.ndarray, r: "PeakMeasurement",
                       flank_inner: float = 1.5, flank_outer: float = 3.0) -> float:
    """Estimate local noise using flanking bands outside the peak window.
    Bands: [left - outer*W, left - inner*W] and [right + inner*W, right + outer*W].
    Returns robust sigma (MAD*1.4826) or NaN when insufficient points.
    """
    W = (r.mz_right - r.mz_left)
    if not np.isfinite(W) or W <= 0:
        return np.nan
    l0 = r.mz_left - flank_outer * W
    l1 = r.mz_left - flank_inner * W
    r0 = r.mz_right + flank_inner * W
    r1 = r.mz_right + flank_outer * W
    ml = (x >= l0) & (x <= l1)
    mr = (x >= r0) & (x <= r1)
    band = ml | mr
    if not np.any(band):
        return np.nan
    return _robust_sigma(y[band])


def _ratio_db(num: float, den: float) -> float:
    """Return 20*log10(num/den) with robust guards; NaN if invalid."""
    if not (np.isfinite(num) and np.isfinite(den)):
        return np.nan
    if den <= 0 or num <= 0:
        return np.nan
    return 20.0 * float(np.log10(num / den))


def _resample_uniform(x: np.ndarray, y: np.ndarray, dx: Optional[float] = None) -> Tuple[np.ndarray, np.ndarray, float]:
    """Resample (x,y) to a uniform grid using linear interpolation.
    Returns (xu, yu, fs) where fs is the sampling frequency (1/dx).
    """
    x = np.asarray(x, dtype=np.float64).ravel()
    y = np.asarray(y, dtype=np.float64).ravel()
    if x.size < 2:
        return x.copy(), y.copy(), np.nan
    x0, x1 = float(x[0]), float(x[-1])
    if not np.isfinite(x0) or not np.isfinite(x1) or x1 <= x0:
        return x.copy(), y.copy(), np.nan
    dx0 = float(np.nanmedian(np.diff(x)))
    if not np.isfinite(dx0) or dx0 <= 0:
        return x.copy(), y.copy(), np.nan
    if dx is None or not np.isfinite(dx) or dx <= 0:
        dx = dx0
    n = int(np.floor((x1 - x0) / dx)) + 1
    xu = x0 + dx * np.arange(n, dtype=np.float64)
    yu = np.interp(xu, x, y)
    fs = 1.0 / dx
    return xu, yu, fs


def _hf_power(
    x: np.ndarray,
    y: np.ndarray,
    baseline_mask: np.ndarray,
    cutoff_hz: Optional[float] = None,
    cutoff_frac: float = 0.3,
    resample_dx: Optional[float] = None,
    psd_method: Literal["welch", "periodogram"] = "welch",
    welch_nperseg: Optional[int] = None,
) -> Tuple[float, float]:
    """High-frequency power and fraction in baseline regions.
    Resamples baseline to a uniform grid, estimates PSD (Welch by default),
    and integrates power above a cutoff.
    """
    if baseline_mask is None or not np.any(baseline_mask):
        return np.nan, np.nan
    xb = np.asarray(x, dtype=np.float64)[baseline_mask]
    yb = np.asarray(y, dtype=np.float64)[baseline_mask]
    if xb.size < 8:
        return np.nan, np.nan
    xu, yu, fs = _resample_uniform(xb, yb, dx=resample_dx)
    if not np.isfinite(fs) or yu.size < 16:
        return np.nan, np.nan
    yu = yu - np.mean(yu)

    # PSD estimation
    if psd_method == "welch":
        # Choose nperseg if not given (next power of two, up to 1024)
        n = yu.size
        if welch_nperseg is None:
            nper = 1
            while nper < max(16, n // 8):
                nper <<= 1
            nper = int(min(nper, 1024))
        else:
            nper = int(welch_nperseg)
        f, Pxx = signal.welch(yu, fs=fs, nperseg=nper, noverlap=nper//2, window="hann", scaling="density", detrend="constant")
    else:
        f, Pxx = signal.periodogram(yu, fs=fs, scaling='density', detrend='constant')

    if f.size == 0:
        return np.nan, np.nan
    nyq = 0.5 * fs
    fc = float(cutoff_hz) if (cutoff_hz is not None and np.isfinite(cutoff_hz)) else float(np.clip(cutoff_frac, 1e-6, 0.999999)) * nyq
    hf_mask = f >= fc
    if not np.any(hf_mask):
        return 0.0, 0.0
    total_power = float(trapezoid(Pxx, f)) if np.all(np.isfinite(Pxx)) else np.nan
    hf_power = float(trapezoid(Pxx[hf_mask], f[hf_mask])) if np.all(np.isfinite(Pxx[hf_mask])) else np.nan
    if not (np.isfinite(total_power) and total_power > 0):
        return hf_power, np.nan
    return hf_power, float(hf_power / total_power)


def _eval_method_task(
    name: str,
    cfg_slim: Dict[str, Any],
    x_arr: NDArray[np.float64],
    y_arr: NDArray[np.float64],
    ref_meas: List["PeakMeasurement"],
    rel_height: float,
    search_ppm: float,
    match_min_prominence_ratio: float,
    match_min_prominence_abs: float,
    match_min_width_pts: float,
    baseline_mask: NDArray[np.bool_],
    sigma_raw_global: float,
    local_sigma_raw: List[float],
    hf_enabled: bool,
    hf_cutoff_hz: Optional[float],
    hf_cutoff_frac: float,
    hf_resample_dx: Optional[float],
    hf_power_raw_global: float,
    flank_inner: float,
    flank_outer: float,
    hf_psd_method: Literal["welch", "periodogram"],
    hf_welch_nperseg: Optional[int],
) -> List[Dict[str, Any]]:
    """Run one denoising method and measure all reference peaks + noise metrics.

    Workflow
    --------
    1) Apply the denoiser (`noise_filtering`) with configuration `cfg_slim`.
    2) Compute a **global** background noise estimate on baseline regions (σ̂ via MAD) and the
       corresponding noise reduction in dB vs. RAW.
    3) Optionally compute **high-frequency residual power** on baseline regions via PSD and its
       reduction in dB (Welch by default).
    4) For each reference peak:
       – Re-measure peak on the denoised signal within a ppm window.
       – Compute % changes (height, FWHM, area), m/z shift, and local ΔSNR in dB using flanking bands.

    Returns
    -------
    list of dict
        One row per reference peak with peak stats and method-level noise metrics.
    """
    # Run denoiser
    y_hat = noise_filtering(y_arr, x=x_arr, **cfg_slim)

    # Global noise estimate (baseline regions only)
    try:
        sigma_new_global = _robust_sigma(y_hat[baseline_mask]) if np.any(baseline_mask) else _robust_sigma(y_hat)
    except Exception:
        sigma_new_global = np.nan
    noise_red_db = _ratio_db(sigma_raw_global, sigma_new_global)

    # High-frequency residual power (baseline regions only)
    if hf_enabled:
        try:
            hf_power_new, hf_frac_new = _hf_power(
                x_arr, y_hat, baseline_mask,
                cutoff_hz=hf_cutoff_hz, cutoff_frac=float(hf_cutoff_frac), resample_dx=hf_resample_dx,
                psd_method=hf_psd_method, welch_nperseg=hf_welch_nperseg,
            )
            hf_power_reduction_db = _ratio_db(hf_power_raw_global, hf_power_new)
        except Exception:
            hf_power_new = np.nan
            hf_frac_new = np.nan
            hf_power_reduction_db = np.nan
    else:
        hf_power_new = np.nan
        hf_frac_new = np.nan
        hf_power_reduction_db = np.nan

    rows: List[Dict[str, Any]] = []
    for i, r in enumerate(ref_meas):
        m_meas = _measure_on_method(
            x_arr,
            y_hat,
            r,
            rel_height=rel_height,
            search_ppm=search_ppm,
            min_prominence_ratio=match_min_prominence_ratio,
            min_prominence_abs=match_min_prominence_abs,
            min_width_pts=match_min_width_pts,
        )
        matched = m_meas is not None

        # Local noise around this peak (post-denoise)
        sigma_loc_raw = local_sigma_raw[i] if i < len(local_sigma_raw) else np.nan
        try:
            sigma_loc_new = _local_noise_sigma(x_arr, y_hat, r, flank_inner=float(flank_inner), flank_outer=float(flank_outer))
        except Exception:
            sigma_loc_new = np.nan

        if matched:
            d_mz = m_meas.mz_center - r.mz_center
            ph = _percent_change(r.height, m_meas.height)
            pf = _percent_change(r.fwhm_pts, m_meas.fwhm_pts)
            pa = _percent_change(r.area, m_meas.area)
            mz_left, mz_right = m_meas.mz_left, m_meas.mz_right
            height_new = m_meas.height
            fwhm_new = m_meas.fwhm_pts
            area_new = m_meas.area
        else:
            d_mz = ph = pf = pa = np.nan
            mz_left = mz_right = np.nan
            height_new = fwhm_new = area_new = np.nan

        # SNRs and ΔSNR (dB)
        snr_raw = (r.height / sigma_loc_raw) if (np.isfinite(r.height) and np.isfinite(sigma_loc_raw) and sigma_loc_raw > 0) else np.nan
        snr_new = (height_new / sigma_loc_new) if (np.isfinite(height_new) and np.isfinite(sigma_loc_new) and sigma_loc_new > 0) else np.nan
        delta_snr_db = _ratio_db(snr_new, snr_raw)

        rows.append(dict(
            method=name,
            mz_ref=r.mz_center,
            mz_shift=d_mz,
            height_ref=r.height,
            height_new=height_new,
            fwhm_ref=r.fwhm_pts,
            fwhm_new=fwhm_new,
            area_ref=r.area,
            area_new=area_new,
            pct_height=ph, pct_fwhm=pf, pct_area=pa,
            mz_left=mz_left, mz_right=mz_right,
            matched=int(matched),
            # noise metrics
            sigma_raw_global=sigma_raw_global,
            sigma_new_global=sigma_new_global,
            noise_reduction_db=noise_red_db,
            sigma_local_raw=sigma_loc_raw,
            sigma_local_new=sigma_loc_new,
            snr_raw=snr_raw,
            snr_new=snr_new,
            delta_snr_db=delta_snr_db,
            hf_power_raw_global=hf_power_raw_global,
            hf_power_new=hf_power_new,
            hf_frac_new=hf_frac_new,
            hf_power_reduction_db=hf_power_reduction_db,
        ))
    return rows


# Top-level, picklable wrapper for process pools
def _task_wrapper(args: Tuple[Any, ...]) -> List[Dict[str, Any]]:
    """Picklable top-level wrapper for process pools.

    Unpacks an argument tuple and calls `_eval_method_task`. Using this function ensures
    compatibility with `ProcessPoolExecutor`, which requires picklable callables.
    """
    (
        name,
        cfg_slim,
        x_arr,
        y_arr,
        ref_meas,
        rel_height,
        search_ppm,
        match_min_prominence_ratio,
        match_min_prominence_abs,
        match_min_width_pts,
        baseline_mask,
        sigma_raw_global,
        local_sigma_raw,
        hf_enabled,
        hf_cutoff_hz,
        hf_cutoff_frac,
        hf_resample_dx,
        hf_power_raw_global,
        flank_inner,
        flank_outer,
        hf_psd_method,
        hf_welch_nperseg,
    ) = args
    return _eval_method_task(
        name, cfg_slim, x_arr, y_arr, ref_meas, rel_height, search_ppm,
        match_min_prominence_ratio, match_min_prominence_abs, match_min_width_pts,
        baseline_mask, sigma_raw_global, local_sigma_raw,
        hf_enabled, hf_cutoff_hz, hf_cutoff_frac, hf_resample_dx,
        hf_power_raw_global, flank_inner, flank_outer,
        hf_psd_method, hf_welch_nperseg,
    )


def _build_denoising_method_grid(
    x_arr: NDArray[np.float64],
    *,
    resample_to_uniform: bool,
    target_dx: Optional[float],
    include_derivatives: bool,
) -> Dict[str, Dict[str, Any]]:
    """Construct the candidate denoising-method grid for model selection."""
    methods: Dict[str, Dict[str, Any]] = {}

    for th in ("universal", "bayes", "sure", "sure_opt"):
        for sigstr in ("per_level", "global"):
            for wvlt in ("db4", "db8", "sym5", "sym8", "coif2", "coif3"):
                for cs in (0, 4, 8, 16, 32):
                    for vst in ("none", "anscombe"):
                        label = f"wavelet:{th}:{sigstr}:{wvlt}:{cs}:{vst}"
                        methods[label] = dict(
                            method="wavelet",
                            wavelet=wvlt,
                            level=None,
                            threshold_strategy=th,
                            threshold_mode="soft",
                            sigma=None,
                            sigma_strategy=sigstr,
                            variance_stabilize=vst,
                            cycle_spins=cs,
                            pywt_mode="periodization",
                            clip_nonnegative=True,
                            preserve_tic=False,
                            x=x_arr,
                            resample_to_uniform=resample_to_uniform,
                            target_dx=target_dx,
                        )

    sg_derivs = (0, 1, 2) if include_derivatives else (0,)
    for win_l in (10, 15, 20, 25, 50):
        for poly in (2, 3, 4, 5):
            wl = int(win_l)
            if wl % 2 == 0:
                wl += 1
            if wl <= poly:
                continue
            for deriv in sg_derivs:
                label = f"savitzky_golay:window_{wl}:poly_{poly}:deriv_{deriv}"
                methods[label] = dict(
                    method="savitzky_golay",
                    window_length=wl,
                    polyorder=poly,
                    deriv=deriv,
                    x=x_arr,
                    resample_to_uniform=resample_to_uniform,
                    target_dx=target_dx,
                )

    gaussian_orders = (0, 1, 2) if include_derivatives else (0,)
    for win_lg in (10, 15, 20, 25, 50):
        for gorder in gaussian_orders:
            label = f"gaussian:window_{win_lg}:order_{gorder}"
            methods[label] = dict(
                method="gaussian",
                window_length=win_lg,
                gauss_sigma_pts=max(1.0, win_lg / 6.0),
                gaussian_order=gorder,
                x=x_arr,
                resample_to_uniform=resample_to_uniform,
                target_dx=target_dx,
            )

    for win_lm in (10, 15, 20, 25, 50):
        label = f"median:window_{win_lm}"
        methods[label] = dict(
            method="median",
            window_length=win_lm,
            x=x_arr,
            resample_to_uniform=resample_to_uniform,
            target_dx=target_dx,
        )

    methods["none"] = dict(method="none", x=x_arr, resample_to_uniform=False, target_dx=None)
    return methods


[docs] @dataclass class PeakMeasurement: """Container for per-peak measurements. Attributes ---------- mz_center : float Peak center m/z (from the provided x-axis) at the local maximum index. idx_center : int Index of the peak center in the array (integer sample index). height : float Peak height (y at the local maximum) on the measured signal. fwhm_pts : float Full width at half maximum, measured in *index points* on the measured signal. mz_left, mz_right : float Left/right m/z boundaries where the peak crosses the chosen relative height (e.g., 50%). area : float Trapezoidal integral of y over [mz_left, mz_right]. prominence : float Peak prominence measured on the same signal; used to reject weak shoulders when re-matching peaks after denoising. """ mz_center: float idx_center: int height: float fwhm_pts: float mz_left: float mz_right: float area: float prominence: float = np.nan
def _ensure_axes(y: np.ndarray, x: Optional[np.ndarray] ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Validate inputs and return finite (x, y) arrays of equal shape. If `x` is None, a synthetic index axis [0, 1, ..., N-1] is created. NaNs/Infs are dropped *synchronously* from both arrays. Parameters ---------- y : array-like Signal values (intensity). Will be cast to float64 and flattened. x : array-like or None m/z axis; if None, an index axis is used. Returns ------- x_arr, y_arr : np.ndarray 1D float64 arrays of equal length containing only finite samples. """ # Normalize dtypes/shapes y = np.asarray(y, dtype=np.float64).ravel() if x is None: x = np.arange(y.size, dtype=np.float64) else: x = np.asarray(x, dtype=np.float64).ravel() if x.shape != y.shape: raise ValueError("x and y must have same shape") # Keep only finite pairs m = np.isfinite(y) & np.isfinite(x) if not np.any(m): raise ValueError("No finite data in (x, y).") return x[m], y[m] def _idx_to_mz(idx_f: float, x: np.ndarray) -> float: """Map a *floating* sample index to the x-axis (m/z) by linear interpolation. Parameters ---------- idx_f : float Floating index position (e.g., left/right intercepts from `peak_widths`). x : np.ndarray Monotonic x-axis array. Returns ------- float Interpolated m/z value at the requested fractional index. """ n = x.size if idx_f <= 0: return float(x[0]) if idx_f >= n - 1: return float(x[-1]) i0 = int(np.floor(idx_f)) t = idx_f - i0 return float((1 - t) * x[i0] + t * x[i0 + 1]) def _measure_one_peak( x: NDArray[np.float64], y: NDArray[np.float64], peak_idx: int, rel_height: float = 0.5, prominence: Optional[float] = None, ) -> PeakMeasurement: """Measure a single peak at `peak_idx` on signal `y` using SciPy's half-height width. Steps ----- 1) Use `signal.peak_widths` at the requested `rel_height` to obtain left/right intercepts and the FWHM in *index points* on the **provided signal**. 2) Convert fractional intercept indices back to m/z via linear interpolation on `x`. 3) Integrate the area within the [left, right] span using `scipy.integrate.trapezoid`. Notes ----- • Measurements are performed on the *current* y (raw or denoised), not a global template. • Returning FWHM in index points avoids ambiguity when x is nonuniform; boundaries are in m/z. """ # width & boundaries at half height on THIS y _pw = signal.peak_widths(y, [peak_idx], rel_height=rel_height) widths = _pw[0] _ = _pw[1] # unused: evaluation heights from peak_widths left_ips = _pw[2] right_ips = _pw[3] w = float(widths[0]) lidx = float(left_ips[0]) ridx = float(right_ips[0]) # center & height at integer max location idx_center = int(peak_idx) mz_center = float(x[idx_center]) height = float(y[idx_center]) # boundaries in m/z mz_left = _idx_to_mz(lidx, x) mz_right = _idx_to_mz(ridx, x) # integrate area within [lidx, ridx] using trapezoid (interp to integer grid) i0 = max(0, int(np.floor(lidx))) i1 = min(x.size - 1, int(np.ceil(ridx))) xx = x[i0:i1+1] yy = y[i0:i1+1] area = float(trapezoid(yy, xx)) if prominence is None: try: prominence = float(signal.peak_prominences(y, [peak_idx])[0][0]) except (ValueError, RuntimeError, FloatingPointError, IndexError): prominence = np.nan return PeakMeasurement( mz_center=mz_center, idx_center=idx_center, height=height, fwhm_pts=w, mz_left=mz_left, mz_right=mz_right, area=area, prominence=float(prominence) if prominence is not None else np.nan, ) def _build_reference_peaks( x: np.ndarray, y: np.ndarray, *, max_peaks: int = 300, min_prominence: Optional[float] = None, rel_height: float = 0.5, ) -> Tuple[np.ndarray, List[PeakMeasurement]]: """ Detect reference peaks on RAW y (no denoise), then measure their FWHM windows. We keep the top 'max_peaks' by prominence. """ # Robust default: let SciPy pick peaks, then keep strongest by prominence peaks, props = signal.find_peaks(y, prominence=min_prominence) if peaks.size == 0: raise ValueError("No peaks detected on raw data; adjust 'min_prominence' or pre-smooth lightly.") prom = props.get("prominences", np.zeros_like(peaks, dtype=float)) order = np.argsort(prom)[::-1] # descending by prominence keep = order[:max_peaks] peaks_ref = peaks[keep] prom_lookup = {int(pk): float(pm) for pk, pm in zip(peaks.tolist(), prom.tolist())} ref_measurements: List[PeakMeasurement] = [] for p in peaks_ref: ref_measurements.append( _measure_one_peak( x, y, int(p), rel_height=rel_height, prominence=prom_lookup.get(int(p), np.nan), ) ) return peaks_ref, ref_measurements def _measure_on_method( x: NDArray[np.float64], y_method: NDArray[np.float64], ref_meas: PeakMeasurement, rel_height: float = 0.5, search_ppm: float = 20.0, min_prominence_ratio: float = 0.1, min_prominence_abs: float = 0.0, min_width_pts: float = 0.25, ) -> Optional[PeakMeasurement]: """ Re-measure a peak near the reference center within a ppm window on ``y_method``. Matching is intentionally conservative: - candidates must be true local maxima returned by ``find_peaks`` - candidates must clear a minimum prominence relative to the reference peak - widths and enclosed areas must remain positive after re-measurement Returns ``None`` if no scientifically plausible match is found. """ mz0 = float(ref_meas.mz_center) tol = float(mz0 * search_ppm * 1e-6) # ensure python floats to please static type checkers x0 = float(x[0]) xN = float(x[-1]) left = max(x0, mz0 - tol) right = min(xN, mz0 + tol) if right <= left: return None # restrict to indices within the search window m: NDArray[np.bool_] = (x >= left) & (x <= right) if not np.any(m): return None idx_window = np.nonzero(m)[0] ys = y_method[m] if ys.size < 3: return None # Only consider true local maxima inside the search window; this prevents a # monotonic shoulder or boundary sample from being counted as a preserved peak. peak_rel, props = signal.find_peaks(ys, prominence=0.0) if peak_rel.size == 0: return None prominences = np.asarray(props.get("prominences", np.full(peak_rel.size, np.nan)), dtype=float) ref_prom = float(ref_meas.prominence) if np.isfinite(ref_meas.prominence) else np.nan min_prom = float(min_prominence_abs) if np.isfinite(ref_prom) and ref_prom > 0: min_prom = max(min_prom, float(min_prominence_ratio) * ref_prom) valid = np.isfinite(prominences) if min_prom > 0: valid &= prominences >= min_prom if not np.any(valid): return None peak_rel = peak_rel[valid] prominences = prominences[valid] abs_idx = idx_window[peak_rel] peak_mz = x[abs_idx] # Prefer the closest peak to the reference center, then break ties by prominence. order = np.lexsort((-prominences, np.abs(peak_mz - mz0))) for ord_idx in order: idx_center = int(abs_idx[ord_idx]) prominence = float(prominences[ord_idx]) try: measured = _measure_one_peak( x, y_method, idx_center, rel_height=rel_height, prominence=prominence, ) except (ValueError, RuntimeError, IndexError, FloatingPointError): continue if not np.isfinite(measured.fwhm_pts) or measured.fwhm_pts <= float(min_width_pts): continue if not np.isfinite(measured.area) or measured.area <= 0: continue if not (np.isfinite(measured.mz_left) and np.isfinite(measured.mz_right)): continue if measured.mz_right <= measured.mz_left: continue return measured return None def _percent_change(a: float, b: float) -> float: """Percent change from reference `a` to method value `b`. Defined as `100 * (b - a) / a`. Returns NaN if `a` is zero or if either input is non-finite. Note: This metric is asymmetric; consider log-ratio or symmetric % if needed for analysis. """ if a == 0 or not np.isfinite(a) or not np.isfinite(b): return np.nan return 100.0 * (b - a) / a
[docs] def compare_denoising_methods( x: Optional[np.ndarray], y: np.ndarray, *, min_mz: Optional[float] = None, max_mz: Optional[float] = None, max_peaks: int = 300, min_prominence: Optional[float] = None, rel_height: float = 0.5, search_ppm: float = 20.0, match_min_prominence_ratio: float = 0.1, match_min_prominence_abs: float = 0.0, match_min_width_pts: float = 0.25, resample_to_uniform: bool = False, include_derivatives: bool = False, target_dx: Optional[float] = None, return_format: Literal["pandas", "polars"] = "pandas", n_jobs: int = -1, parallel_backend: Literal["thread", "process"] = "thread", progress: bool = True, # --- New knobs --- baseline_expand: float = 2.0, flank_inner: float = 1.5, flank_outer: float = 3.0, hf_enabled: bool = True, hf_cutoff_hz: Optional[float] = None, hf_cutoff_frac: float = 0.3, hf_resample_dx: Optional[float] = None, hf_psd_method: Literal["welch", "periodogram"] = "welch", hf_welch_nperseg: Optional[int] = None, ) -> Tuple[Any, Any]: """ Run a battery of denoising/smoothing methods and quantify both **peak preservation** and **noise reduction**. Parameters ---------- x : array-like or None m/z axis. If None, an index axis [0..N-1] is used. y : array-like Raw intensities. min_mz, max_mz : float, optional Optional range restriction on `x` prior to peak detection and evaluation. max_peaks : int, default 300 Maximum number of reference peaks (by prominence) to evaluate in the selected range. min_prominence : float, optional Prominence threshold for `scipy.signal.find_peaks` during reference detection. rel_height : float, default 0.5 Relative height used for FWHM measurements (e.g., 0.5 = half-height). search_ppm : float, default 20.0 ±ppm window around each reference m/z used to re-detect peaks after denoising. match_min_prominence_ratio : float, default 0.1 Minimum post-denoise prominence required for a matched peak, expressed as a fraction of the raw reference prominence. match_min_prominence_abs : float, default 0.0 Absolute lower bound for post-denoise peak prominence. match_min_width_pts : float, default 0.25 Minimum acceptable peak width in index points for a post-denoise match. resample_to_uniform : bool, default False If True, allow denoisers to resample to a uniform grid internally when beneficial. include_derivatives : bool, default False If True, include derivative-style Savitzky-Golay (`deriv>0`) and Gaussian (`order>0`) operators in the candidate grid. By default the search includes only smoothing/denoising variants. target_dx : float, optional Desired spacing when `resample_to_uniform=True`. return_format : {"pandas","polars"}, default "pandas" Backend for output DataFrames. n_jobs : int, default -1 Number of workers used to evaluate methods in parallel (1 disables parallelism). parallel_backend : {"thread","process"}, default "thread" Parallelism backend. Threads are often efficient because NumPy/SciPy/PyWavelets drop the GIL. progress : bool, default True Show a progress bar if `tqdm` is available. baseline_expand : float, default 2.0 Multiplier to expand each peak's FWHM window when masking baseline regions used for noise/PSD estimates. flank_inner, flank_outer : float, defaults 1.5 and 3.0 Distances (in FWHM multiples) defining flanking bands used for **local** noise estimation. hf_enabled : bool, default True If True, compute high-frequency (HF) residual power metrics on baseline regions via PSD. hf_cutoff_hz : float, optional Absolute HF cutoff frequency (cycles per m/z). If None, uses `hf_cutoff_frac * Nyquist`. hf_cutoff_frac : float, default 0.3 Fraction of the Nyquist frequency used as the HF band when `hf_cutoff_hz` is None. hf_resample_dx : float, optional Δx used to resample baseline segments to a uniform grid for PSD; defaults to median Δx if None. hf_psd_method : {"welch","periodogram"}, default "welch" PSD estimator for HF power. Welch provides lower-variance estimates on finite windows. hf_welch_nperseg : int, optional Segment length for Welch PSD. If None, chosen automatically (≈ max(16, N/8), power-of-two, ≤1024). Returns ------- summary_df, per_peak_df : DataFrame If `return_format="pandas"`, returns pandas.DataFrame; if "polars", returns polars.DataFrame. `summary_df` contains method-level medians/IQRs, noise and HF metrics; `per_peak_df` has per-peak rows. """ x_arr, y_arr = _ensure_axes(y, x) # Range restriction if min_mz is not None or max_mz is not None: lo_f = float(-np.inf) if min_mz is None else float(min_mz) hi_f = float(np.inf) if max_mz is None else float(max_mz) msel: NDArray[np.bool_] = (x_arr >= lo_f) & (x_arr <= hi_f) if not np.any(msel): raise ValueError("Selected m/z range contains no data.") x_arr, y_arr = x_arr[msel], y_arr[msel] # Reference peaks on raw ref_idx, ref_meas = _build_reference_peaks( x_arr, y_arr, max_peaks=max_peaks, min_prominence=min_prominence, rel_height=rel_height ) # --- Noise baselines (computed once on RAW) --- baseline_mask = _compute_baseline_mask(x_arr, ref_meas, expand=float(baseline_expand)) sigma_raw_global = _robust_sigma(y_arr[baseline_mask]) if np.any(baseline_mask) else _robust_sigma(y_arr) local_sigma_raw = [ _local_noise_sigma(x_arr, y_arr, r, flank_inner=float(flank_inner), flank_outer=float(flank_outer)) for r in ref_meas ] # --- High-frequency power baseline (RAW) --- if hf_enabled: hf_power_raw_global, hf_frac_raw_global = _hf_power( x_arr, y_arr, baseline_mask, cutoff_hz=hf_cutoff_hz, cutoff_frac=float(hf_cutoff_frac), resample_dx=hf_resample_dx, psd_method=hf_psd_method, welch_nperseg=hf_welch_nperseg, ) else: hf_power_raw_global, hf_frac_raw_global = np.nan, np.nan methods = _build_denoising_method_grid( x_arr, resample_to_uniform=resample_to_uniform, target_dx=target_dx, include_derivatives=include_derivatives, ) # --- Evaluate all methods (optionally in parallel). Process pools need a picklable wrapper. rows_peak: List[Dict[str, Any]] = [] # Build lightweight tasks: drop large arrays from cfg to reduce pickling tasks: List[Tuple[str, Dict[str, Any]]] = [] for name, cfg in methods.items(): cfg_slim = dict(cfg) cfg_slim.pop("x", None) # will be passed separately tasks.append((name, cfg_slim)) total = len(tasks) # Build arg tuples once arg_tuples: List[Tuple[Any, ...]] = [ ( name, cfg_slim, x_arr, y_arr, ref_meas, rel_height, search_ppm, float(match_min_prominence_ratio), float(match_min_prominence_abs), float(match_min_width_pts), baseline_mask, sigma_raw_global, local_sigma_raw, hf_enabled, hf_cutoff_hz, hf_cutoff_frac, hf_resample_dx, hf_power_raw_global, float(flank_inner), float(flank_outer), hf_psd_method, hf_welch_nperseg, ) for (name, cfg_slim) in tasks ] if n_jobs == 1: iterable = (_task_wrapper(args) for args in arg_tuples) for rows in _iter_with_progress(iterable, total=total, progress=progress, desc="Denoising methods"): rows_peak.extend(rows) else: if parallel_backend == "thread": from concurrent.futures import ThreadPoolExecutor as Executor elif parallel_backend == "process": from concurrent.futures import ProcessPoolExecutor as Executor else: raise ValueError("parallel_backend must be 'thread' or 'process'") _workers = os.cpu_count() or 4 if n_jobs <= 0 else int(n_jobs) with Executor(max_workers=_workers) as ex: mapped = ex.map(_task_wrapper, arg_tuples, chunksize=1) for rows in _iter_with_progress(mapped, total=total, progress=progress, desc=f"Denoising methods ({parallel_backend})"): rows_peak.extend(rows) if return_format == "pandas": import pandas as pd per_peak_df = pd.DataFrame(rows_peak) def _iqr(s: "pd.Series") -> float: """Return the interquartile range for a numeric pandas Series.""" q = s.quantile([0.25, 0.75]) return float(q.iloc[1] - q.iloc[0]) summary = [] for name, grp in per_peak_df.groupby("method"): gmatch = grp[grp["matched"] == 1] total = grp.shape[0] matched = gmatch.shape[0] lost = total - matched frac_matched = matched / total if total else np.nan # take method-level constants from first row gfirst = grp.iloc[0] sigma_raw_g = float(gfirst.get("sigma_raw_global", np.nan)) sigma_new_g = float(gfirst.get("sigma_new_global", np.nan)) noise_red_db = float(gfirst.get("noise_reduction_db", np.nan)) hf_red_db = float(gfirst.get("hf_power_reduction_db", np.nan)) hf_frac_new = float(gfirst.get("hf_frac_new", np.nan)) summary.append(dict( method=name, peaks_total=total, peaks_matched=matched, peaks_lost=lost, frac_matched=frac_matched, mz_shift_med=float(gmatch["mz_shift"].median()) if matched else np.nan, mz_shift_iqr=_iqr(gmatch["mz_shift"]) if matched else np.nan, pct_height_med=float(gmatch["pct_height"].median()) if matched else np.nan, pct_height_iqr=_iqr(gmatch["pct_height"]) if matched else np.nan, pct_fwhm_med=float(gmatch["pct_fwhm"].median()) if matched else np.nan, pct_fwhm_iqr=_iqr(gmatch["pct_fwhm"]) if matched else np.nan, pct_area_med=float(gmatch["pct_area"].median()) if matched else np.nan, pct_area_iqr=_iqr(gmatch["pct_area"]) if matched else np.nan, # noise-aware metrics sigma_raw_global=sigma_raw_g, sigma_new_global=sigma_new_g, noise_reduction_db=noise_red_db, delta_snr_db_med=float(gmatch["delta_snr_db"].median()) if matched else np.nan, delta_snr_db_iqr=_iqr(gmatch["delta_snr_db"]) if matched else np.nan, hf_power_reduction_db=hf_red_db, hf_frac_new_global=hf_frac_new, )) summary_df = pd.DataFrame(summary).sort_values(["method"]).reset_index(drop=True) return summary_df, per_peak_df elif return_format == "polars": try: import polars as pl except ImportError as e: raise ImportError("polars is not installed. Install it or use return_format='pandas'.") from e per_peak_df = pl.DataFrame(rows_peak) summary_df = ( per_peak_df .group_by("method") .agg([ pl.count().alias("peaks_total"), pl.col("matched").sum().alias("peaks_matched"), (pl.count() - pl.col("matched").sum()).alias("peaks_lost"), (pl.col("matched").sum() / pl.count()).alias("frac_matched"), pl.col("mz_shift").filter((pl.col("matched") == 1) & ~pl.col("mz_shift").is_nan()).median().alias("mz_shift_med"), ( pl.col("mz_shift").filter((pl.col("matched") == 1) & ~pl.col("mz_shift").is_nan()).quantile(0.75) - pl.col("mz_shift").filter((pl.col("matched") == 1) & ~pl.col("mz_shift").is_nan()).quantile(0.25) ).alias("mz_shift_iqr"), pl.col("pct_height").filter((pl.col("matched") == 1) & ~pl.col("pct_height").is_nan()).median().alias("pct_height_med"), ( pl.col("pct_height").filter((pl.col("matched") == 1) & ~pl.col("pct_height").is_nan()).quantile(0.75) - pl.col("pct_height").filter((pl.col("matched") == 1) & ~pl.col("pct_height").is_nan()).quantile(0.25) ).alias("pct_height_iqr"), pl.col("pct_fwhm").filter((pl.col("matched") == 1) & ~pl.col("pct_fwhm").is_nan()).median().alias("pct_fwhm_med"), ( pl.col("pct_fwhm").filter((pl.col("matched") == 1) & ~pl.col("pct_fwhm").is_nan()).quantile(0.75) - pl.col("pct_fwhm").filter((pl.col("matched") == 1) & ~pl.col("pct_fwhm").is_nan()).quantile(0.25) ).alias("pct_fwhm_iqr"), pl.col("pct_area").filter((pl.col("matched") == 1) & ~pl.col("pct_area").is_nan()).median().alias("pct_area_med"), ( pl.col("pct_area").filter((pl.col("matched") == 1) & ~pl.col("pct_area").is_nan()).quantile(0.75) - pl.col("pct_area").filter((pl.col("matched") == 1) & ~pl.col("pct_area").is_nan()).quantile(0.25) ).alias("pct_area_iqr"), # noise-aware metrics (take first since constant per method) pl.col("sigma_raw_global").first().alias("sigma_raw_global"), pl.col("sigma_new_global").first().alias("sigma_new_global"), pl.col("noise_reduction_db").first().alias("noise_reduction_db"), pl.col("delta_snr_db").filter((pl.col("matched") == 1) & ~pl.col("delta_snr_db").is_nan()).median().alias("delta_snr_db_med"), ( pl.col("delta_snr_db").filter((pl.col("matched") == 1) & ~pl.col("delta_snr_db").is_nan()).quantile(0.75) - pl.col("delta_snr_db").filter((pl.col("matched") == 1) & ~pl.col("delta_snr_db").is_nan()).quantile(0.25) ).alias("delta_snr_db_iqr"), pl.col("hf_power_reduction_db").first().alias("hf_power_reduction_db"), pl.col("hf_frac_new").first().alias("hf_frac_new_global"), ]) .sort("method") ) return summary_df, per_peak_df else: raise ValueError("return_format must be 'pandas' or 'polars'")
[docs] def aggregate_method_summaries( summary, *, unit_label: str = "windows", return_format: Literal["pandas", "polars"] = "pandas", ): """Aggregate per-method summaries across windows, spectra, or other units. The aggregation intentionally gives each unit equal weight by taking the median of unit-level metrics, while match fractions remain peak-weighted. This avoids a single busy window or spectrum dominating the ranking. """ if pl is not None and isinstance(summary, pl.DataFrame): df = summary.to_pandas() else: df = summary.copy() if "method" not in df.columns: raise KeyError("summary must contain a 'method' column") metric_cols = ( "mz_shift_med", "mz_shift_iqr", "pct_height_med", "pct_height_iqr", "pct_fwhm_med", "pct_fwhm_iqr", "pct_area_med", "pct_area_iqr", "sigma_raw_global", "sigma_new_global", "noise_reduction_db", "delta_snr_db_med", "delta_snr_db_iqr", "hf_power_reduction_db", "hf_frac_new_global", ) def _median_col(group: "pd.DataFrame", col: str) -> float: if col not in group.columns: return np.nan s = pd.to_numeric(group[col], errors="coerce") return float(s.median()) if s.notna().any() else np.nan def _sum_int(group: "pd.DataFrame", col: str, default: int = 0) -> int: if col not in group.columns: return int(default) s = pd.to_numeric(group[col], errors="coerce") return int(s.fillna(0).sum()) rows = [] for method, grp in df.groupby("method", sort=True): peaks_total = _sum_int(grp, "peaks_total") peaks_matched = _sum_int(grp, "peaks_matched") peaks_lost = _sum_int(grp, "peaks_lost", default=max(peaks_total - peaks_matched, 0)) row = { "method": method, unit_label: int(grp.shape[0]), "peaks_total": peaks_total, "peaks_matched": peaks_matched, "peaks_lost": peaks_lost, "frac_matched": (peaks_matched / peaks_total) if peaks_total else np.nan, } for col in metric_cols: row[col] = _median_col(grp, col) rows.append(row) out = pd.DataFrame(rows).sort_values("method").reset_index(drop=True) if return_format == "pandas": return out if return_format == "polars": if pl is None: raise ImportError("polars is not installed. Install it or use return_format='pandas'.") return pl.DataFrame(out) raise ValueError("return_format must be 'pandas' or 'polars'")
# Multi-window helper (stratified evaluation): # a thin wrapper that loops over windows, tags results, and concatenates:
[docs] def compare_methods_in_windows( x: np.ndarray, y: np.ndarray, windows: list[tuple[float, float]], *, per_window_max_peaks: int = 50, min_prominence: Optional[float] = None, rel_height: float = 0.5, search_ppm: float = 20.0, match_min_prominence_ratio: float = 0.1, match_min_prominence_abs: float = 0.0, match_min_width_pts: float = 0.25, resample_to_uniform: bool = False, include_derivatives: bool = False, target_dx: Optional[float] = None, return_format: Literal["pandas", "polars"] = "pandas", n_jobs: int = -1, parallel_backend: Literal["thread","process"] = "thread", progress: bool = True, baseline_expand: float = 2.0, flank_inner: float = 1.5, flank_outer: float = 3.0, hf_enabled: bool = True, hf_cutoff_hz: Optional[float] = None, hf_cutoff_frac: float = 0.3, hf_resample_dx: Optional[float] = None, hf_psd_method: Literal["welch","periodogram"] = "welch", hf_welch_nperseg: Optional[int] = None, auto_tune: bool = False, auto_tune_files: Optional[list[str]] = None, ): """Evaluate denoising methods across multiple m/z windows and aggregate results. Parameters ---------- x, y : np.ndarray m/z axis and intensity values. windows : list[tuple[float, float]] Each tuple is (min_mz, max_mz) for a window to evaluate. per_window_max_peaks : int, default 50 Max number of strongest peaks (by prominence) to measure within each window. min_prominence : float, optional Minimum prominence passed to `signal.find_peaks` for reference peak detection. rel_height : float, default 0.5 Relative height used to define FWHM when measuring peaks. search_ppm : float, default 20.0 ±ppm window around each reference m/z used to re-detect peaks after denoising. match_min_prominence_ratio, match_min_prominence_abs, match_min_width_pts : floats Forwarded to the peak re-matching logic used after denoising. resample_to_uniform, target_dx : optional Passed through to denoisers that support resampling. include_derivatives : bool, default False If True, include derivative-style Savitzky-Golay and Gaussian candidates inside each window's method grid. return_format : {"pandas","polars"} Backend for output DataFrames. n_jobs : int, default -1 Workers used within each window’s call to `compare_denoising_methods`. parallel_backend : {"thread","process"}, default "thread" Parallelism backend. progress : bool, default True Show progress bars during evaluation. baseline_expand, flank_inner, flank_outer : floats Baseline mask expansion and flanking-band multipliers forwarded to noise metrics. hf_enabled, hf_cutoff_hz, hf_cutoff_frac, hf_resample_dx, hf_psd_method, hf_welch_nperseg : optional High-frequency PSD controls forwarded to noise metrics (Welch is lower-variance). Returns ------- If return_format == "pandas": rollup : pd.DataFrame Method-level aggregation across all windows. summary_all : pd.DataFrame Per-window, per-method summary table (noise and peak metrics). detail_all : pd.DataFrame Per-peak detail table across all windows. If return_format == "polars": rollup, summary_all, detail_all : pl.DataFrame """ if auto_tune: from ..adaptive import estimate_denoise_params _dn_overrides = estimate_denoise_params(auto_tune_files or []) if "hf_cutoff_frac" in _dn_overrides: hf_cutoff_frac = float(_dn_overrides["hf_cutoff_frac"]) if "max_peaks" in _dn_overrides: per_window_max_peaks = int(_dn_overrides["max_peaks"]) if return_format == "pandas": import pandas as pd summaries = [] details = [] for (lo, hi) in windows: s, d = compare_denoising_methods( x, y, min_mz=lo, max_mz=hi, max_peaks=per_window_max_peaks, min_prominence=min_prominence, rel_height=rel_height, search_ppm=search_ppm, match_min_prominence_ratio=match_min_prominence_ratio, match_min_prominence_abs=match_min_prominence_abs, match_min_width_pts=match_min_width_pts, resample_to_uniform=resample_to_uniform, include_derivatives=include_derivatives, target_dx=target_dx, return_format="pandas", n_jobs=n_jobs, parallel_backend=parallel_backend, progress=progress, baseline_expand=baseline_expand, flank_inner=flank_inner, flank_outer=flank_outer, hf_enabled=hf_enabled, hf_cutoff_hz=hf_cutoff_hz, hf_cutoff_frac=hf_cutoff_frac, hf_resample_dx=hf_resample_dx, hf_psd_method=hf_psd_method, hf_welch_nperseg=hf_welch_nperseg, ) s = s.copy() s["window"] = f"[{lo},{hi}]" d = d.copy() d["window"] = f"[{lo},{hi}]" summaries.append(s) details.append(d) summary_all = pd.concat(summaries, ignore_index=True) detail_all = pd.concat(details, ignore_index=True) rollup = aggregate_method_summaries( summary_all, unit_label="windows", return_format="pandas", ) return rollup, summary_all, detail_all elif return_format == "polars": try: import polars as pl except ImportError as e: raise ImportError("polars is not installed. Install it or use return_format='pandas'.") from e summaries: list[pl.DataFrame] = [] details: list[pl.DataFrame] = [] for (lo, hi) in windows: s, d = compare_denoising_methods( x, y, min_mz=lo, max_mz=hi, max_peaks=per_window_max_peaks, min_prominence=min_prominence, rel_height=rel_height, search_ppm=search_ppm, match_min_prominence_ratio=match_min_prominence_ratio, match_min_prominence_abs=match_min_prominence_abs, match_min_width_pts=match_min_width_pts, resample_to_uniform=resample_to_uniform, include_derivatives=include_derivatives, target_dx=target_dx, return_format="polars", n_jobs=n_jobs, parallel_backend=parallel_backend, progress=progress, baseline_expand=baseline_expand, flank_inner=flank_inner, flank_outer=flank_outer, hf_enabled=hf_enabled, hf_cutoff_hz=hf_cutoff_hz, hf_cutoff_frac=hf_cutoff_frac, hf_resample_dx=hf_resample_dx, hf_psd_method=hf_psd_method, hf_welch_nperseg=hf_welch_nperseg, ) s = s.with_columns(pl.lit(f"[{lo},{hi}]").alias("window")) d = d.with_columns(pl.lit(f"[{lo},{hi}]").alias("window")) summaries.append(s) details.append(d) summary_all = pl.concat(summaries, how="vertical", rechunk=True) detail_all = pl.concat(details, how="vertical", rechunk=True) rollup = aggregate_method_summaries( summary_all, unit_label="windows", return_format="polars", ) return rollup, summary_all, detail_all else: raise ValueError("return_format must be 'pandas' or 'polars'")
# Rank methods
[docs] def to_ppm(mz_shift_med: float, mz_ref_median: float) -> float: """Convert an absolute m/z shift to parts-per-million (ppm). ppm = 1e6 * Δm / m_ref """ return 1e6 * mz_shift_med / mz_ref_median
DEFAULT_SELECTION_CRITERIA: Dict[str, float] = { "min_frac_matched": 0.80, "max_mz_shift_ppm": 10.0, "max_mz_shift_iqr_ppm": 10.0, "max_pct_area_med": 25.0, "max_pct_height_med": 25.0, "max_pct_fwhm_med": 25.0, "max_pct_area_iqr": 30.0, "max_pct_height_iqr": 30.0, "max_pct_fwhm_iqr": 30.0, "target_noise_db": 3.0, "target_delta_snr_db": 3.0, "target_hf_power_reduction_db": 3.0, "target_hf_frac_new_global": 0.25, } def _resolve_selection_criteria( selection_criteria: Optional[Dict[str, float]] = None, ) -> Dict[str, float]: """Validate and merge dimensionless-selection criteria.""" criteria = dict(DEFAULT_SELECTION_CRITERIA) if selection_criteria is not None: unknown = set(selection_criteria) - set(criteria) if unknown: raise KeyError(f"Unknown selection criteria: {sorted(unknown)}") criteria.update(selection_criteria) for key, value in criteria.items(): if not np.isfinite(value): raise ValueError(f"selection criterion '{key}' must be finite") if not (0.0 < criteria["min_frac_matched"] <= 1.0): raise ValueError("selection criterion 'min_frac_matched' must lie in (0, 1]") positive_keys = set(criteria) - {"min_frac_matched"} for key in positive_keys: if criteria[key] <= 0: raise ValueError(f"selection criterion '{key}' must be > 0") return criteria def _prepare_ranking_frame_pandas( summary_df, per_peak_df, *, min_noise_db: float, min_delta_snr_db: float, selection_criteria: Optional[Dict[str, float]] = None, ) -> pd.DataFrame: """Add dimensionless ranking columns and pre-specified pass/fail gates.""" criteria = _resolve_selection_criteria(selection_criteria) s = _to_pandas_df(summary_df) peak_df = _to_pandas_df(per_peak_df) numeric_cols = ( "frac_matched", "mz_shift_med", "mz_shift_iqr", "pct_area_med", "pct_area_iqr", "pct_height_med", "pct_height_iqr", "pct_fwhm_med", "pct_fwhm_iqr", "noise_reduction_db", "delta_snr_db_med", "hf_power_reduction_db", "hf_frac_new_global", ) for col in numeric_cols: if col not in s.columns: s[col] = np.nan s[col] = pd.to_numeric(s[col], errors="coerce") for col in ("noise_reduction_db", "delta_snr_db_med", "hf_power_reduction_db", "hf_frac_new_global"): s[col] = s[col].fillna(0.0) mz_ref_median = np.nan if "mz_ref" in peak_df.columns: mz_ref = pd.to_numeric(peak_df["mz_ref"], errors="coerce").to_numpy(dtype=float) if mz_ref.size and np.isfinite(mz_ref).any(): mz_ref_median = float(np.nanmedian(mz_ref)) ppm_scale = np.nan if np.isfinite(mz_ref_median) and mz_ref_median != 0: ppm_scale = 1e6 / mz_ref_median s["mz_shift_ppm"] = s["mz_shift_med"].abs() * ppm_scale s["mz_shift_iqr_ppm"] = s["mz_shift_iqr"].abs() * ppm_scale s["abs_area"] = s["pct_area_med"].abs() s["abs_height"] = s["pct_height_med"].abs() s["abs_fwhm"] = s["pct_fwhm_med"].abs() s["abs_area_iqr"] = s["pct_area_iqr"].abs() s["abs_height_iqr"] = s["pct_height_iqr"].abs() s["abs_fwhm_iqr"] = s["pct_fwhm_iqr"].abs() s["scaled_mz_shift"] = s["mz_shift_ppm"] / criteria["max_mz_shift_ppm"] s["scaled_mz_shift_iqr"] = s["mz_shift_iqr_ppm"] / criteria["max_mz_shift_iqr_ppm"] s["scaled_abs_area"] = s["abs_area"] / criteria["max_pct_area_med"] s["scaled_abs_height"] = s["abs_height"] / criteria["max_pct_height_med"] s["scaled_abs_fwhm"] = s["abs_fwhm"] / criteria["max_pct_fwhm_med"] s["scaled_abs_area_iqr"] = s["abs_area_iqr"] / criteria["max_pct_area_iqr"] s["scaled_abs_height_iqr"] = s["abs_height_iqr"] / criteria["max_pct_height_iqr"] s["scaled_abs_fwhm_iqr"] = s["abs_fwhm_iqr"] / criteria["max_pct_fwhm_iqr"] s["spread_sum_scaled"] = s[ [ "scaled_mz_shift_iqr", "scaled_abs_area_iqr", "scaled_abs_height_iqr", "scaled_abs_fwhm_iqr", ] ].sum(axis=1, min_count=1) s["scaled_noise_reduction"] = s["noise_reduction_db"] / criteria["target_noise_db"] s["scaled_delta_snr"] = s["delta_snr_db_med"] / criteria["target_delta_snr_db"] s["scaled_hf_power_reduction"] = ( s["hf_power_reduction_db"] / criteria["target_hf_power_reduction_db"] ) s["scaled_hf_frac"] = s["hf_frac_new_global"] / criteria["target_hf_frac_new_global"] pass_checks = { "pass_frac_matched": s["frac_matched"] >= criteria["min_frac_matched"], "pass_mz_shift_ppm": s["mz_shift_ppm"] <= criteria["max_mz_shift_ppm"], "pass_mz_shift_iqr_ppm": s["mz_shift_iqr_ppm"] <= criteria["max_mz_shift_iqr_ppm"], "pass_abs_area": s["abs_area"] <= criteria["max_pct_area_med"], "pass_abs_height": s["abs_height"] <= criteria["max_pct_height_med"], "pass_abs_fwhm": s["abs_fwhm"] <= criteria["max_pct_fwhm_med"], "pass_abs_area_iqr": s["abs_area_iqr"] <= criteria["max_pct_area_iqr"], "pass_abs_height_iqr": s["abs_height_iqr"] <= criteria["max_pct_height_iqr"], "pass_abs_fwhm_iqr": s["abs_fwhm_iqr"] <= criteria["max_pct_fwhm_iqr"], "pass_noise_reduction": s["noise_reduction_db"] >= float(min_noise_db), "pass_delta_snr": s["delta_snr_db_med"] >= float(min_delta_snr_db), } for col, mask in pass_checks.items(): s[col] = mask s["passes_peak_preservation"] = ( s["pass_frac_matched"] & s["pass_mz_shift_ppm"] & s["pass_mz_shift_iqr_ppm"] & s["pass_abs_area"] & s["pass_abs_height"] & s["pass_abs_fwhm"] & s["pass_abs_area_iqr"] & s["pass_abs_height_iqr"] & s["pass_abs_fwhm_iqr"] ) s["passes_min_denoise"] = s["pass_noise_reduction"] & s["pass_delta_snr"] s["passes_selection_criteria"] = s["passes_peak_preservation"] & s["passes_min_denoise"] fail_cols = [col for col in pass_checks if col.startswith("pass_")] s["failed_criteria_count"] = (~s[fail_cols]).sum(axis=1) return s
[docs] def rank_methods_pandas(summary_df, per_peak_df, w_match=3.0, w_mz=2.0, w_area=2.0, w_height=1.5, w_fwhm=1.0, w_spread=1.0, w_noise_db=2.0, w_delta_snr_db=1.5, w_hf_db=1.5, w_hf_frac=1.0, min_noise_db=0.5, min_delta_snr_db=1.0, selection_criteria: Optional[Dict[str, float]] = None): """Rank methods using explicit gates plus a dimensionless tie-break score. The primary scientific selection rule is: 1. Require peak-preservation and minimum-denoising criteria to pass. 2. Use a dimensionless score only as a secondary tie-break, not as the primary scientific claim. """ s = _prepare_ranking_frame_pandas( summary_df, per_peak_df, min_noise_db=min_noise_db, min_delta_snr_db=min_delta_snr_db, selection_criteria=selection_criteria, ) penalty_fill = 10.0 penalty = ( w_mz * s["scaled_mz_shift"].fillna(penalty_fill) + w_area * s["scaled_abs_area"].fillna(penalty_fill) + w_height * s["scaled_abs_height"].fillna(penalty_fill) + w_fwhm * s["scaled_abs_fwhm"].fillna(penalty_fill) + w_spread * s["spread_sum_scaled"].fillna(penalty_fill) + w_hf_frac * s["scaled_hf_frac"].fillna(penalty_fill) ) reward = ( w_match * s["frac_matched"].fillna(0.0) + w_noise_db * s["scaled_noise_reduction"].fillna(0.0) + w_delta_snr_db * s["scaled_delta_snr"].fillna(0.0) + w_hf_db * s["scaled_hf_power_reduction"].fillna(0.0) ) s["selection_penalty"] = penalty s["selection_reward"] = reward s["selection_score"] = penalty - reward s["score"] = s["selection_score"] fail_mask = ~s["passes_selection_criteria"] s.loc[fail_mask, "score"] = s.loc[fail_mask, "score"] + 1_000_000.0 s = s.sort_values( ["score", "delta_snr_db_med", "frac_matched"], ascending=[True, False, False], ).reset_index(drop=True) return s
[docs] def rank_methods_polars(summary_df: "pl.DataFrame", per_peak_df: "pl.DataFrame", w_match=3.0, w_mz=2.0, w_area=2.0, w_height=1.5, w_fwhm=1.0, w_spread=1.0, w_noise_db=2.0, w_delta_snr_db=1.5, w_hf_db=1.5, w_hf_frac=1.0, min_noise_db: float = 0.5, min_delta_snr_db: float = 1.0, selection_criteria: Optional[Dict[str, float]] = None) -> "pl.DataFrame": """Rank methods (polars) using the pandas implementation for identical semantics.""" if pl is None: raise ImportError("polars is not installed. Install it or use rank_methods_pandas instead.") ranked = rank_methods_pandas( summary_df.to_pandas(), per_peak_df.to_pandas(), w_match=w_match, w_mz=w_mz, w_area=w_area, w_height=w_height, w_fwhm=w_fwhm, w_spread=w_spread, w_noise_db=w_noise_db, w_delta_snr_db=w_delta_snr_db, w_hf_db=w_hf_db, w_hf_frac=w_hf_frac, min_noise_db=min_noise_db, min_delta_snr_db=min_delta_snr_db, selection_criteria=selection_criteria, ) return pl.DataFrame(ranked)
[docs] def rank_method(input_format, summary_df, per_peak_df, w_match=3.0, w_mz=2.0, w_area=2.0, w_height=1.5, w_fwhm=1.0, w_spread=1.0, w_noise_db=2.0, w_delta_snr_db=1.5, w_hf_db=1.5, w_hf_frac=1.0, min_noise_db: float = 0.5, min_delta_snr_db: float = 1.0, selection_criteria: Optional[Dict[str, float]] = None): """Dispatch ranking to pandas or polars implementation with identical semantics. Returns a DataFrame (pandas or polars) sorted by ascending `score` and includes explicit pass/fail flags for denoising and peak-preservation criteria. """ if input_format == 'pandas': summary = rank_methods_pandas(summary_df, per_peak_df, w_match=w_match, w_mz=w_mz, w_area=w_area, w_height=w_height, w_fwhm=w_fwhm, w_spread=w_spread, w_noise_db=w_noise_db, w_delta_snr_db=w_delta_snr_db, w_hf_db=w_hf_db, w_hf_frac=w_hf_frac, min_noise_db=min_noise_db, min_delta_snr_db=min_delta_snr_db, selection_criteria=selection_criteria) elif input_format == 'polars': summary = rank_methods_polars(summary_df, per_peak_df, w_match=w_match, w_mz=w_mz, w_area=w_area, w_height=w_height, w_fwhm=w_fwhm, w_spread=w_spread, w_noise_db=w_noise_db, w_delta_snr_db=w_delta_snr_db, w_hf_db=w_hf_db, w_hf_frac=w_hf_frac, min_noise_db=min_noise_db, min_delta_snr_db=min_delta_snr_db, selection_criteria=selection_criteria) else: raise ValueError("input_format must be 'pandas' or 'polars'") return summary
# ---------------- Pareto visualization -----------------
[docs] def decode_method_label(label: str) -> dict: """Translate a compact label back into ``noise_filtering`` parameters.""" family, *parts = label.split(":") if family == "wavelet": thresh, sigma, wavelet, spins, vst = parts return { "method": "wavelet", "threshold_strategy": thresh, "sigma_strategy": sigma, "wavelet": wavelet, "cycle_spins": int(spins), "variance_stabilize": vst, # defaults the grid used (see Xpektrion/denoise/denoise_select.py:708-724) "threshold_mode": "soft", "pywt_mode": "periodization", "clip_nonnegative": True, "preserve_tic": False, } if family == "savitzky_golay": return { "method": "savitzky_golay", **{k: int(v) for k, v in (p.split("_", 1) for p in parts)} } if family == "gaussian": return { "method": "gaussian", "window_length": int(parts[0].split("_", 1)[1]), "gaussian_order": int(parts[1].split("_", 1)[1]), # sigma is derived as window/6 inside the evaluator } if family == "median": return {"method": "median", "window_length": int(parts[0].split("_", 1)[1])} if family == "none": return {"method": "none"} raise ValueError(f"Unknown label '{label}'")
def _to_pandas_df(summary): """Coerce common tabular inputs into a pandas ``DataFrame`` copy.""" try: if isinstance(summary, pl.DataFrame): return summary.to_pandas() except Exception: pass return summary.copy() if hasattr(summary, "copy") else pd.DataFrame(summary) def _normalize_and_filter(summary, require_pass=True, require_finite_metrics=True): """ Convert summary to pandas, coerce numerics, and apply the *same* filters. """ df = _to_pandas_df(summary) # Required columns for your use cases req = {"method"} missing = req - set(df.columns) if missing: raise ValueError(f"summary missing required columns: {sorted(missing)}") # Coerce numerics that the plot and ranking use if "pct_height_med" in df.columns: df["pct_height_med"] = pd.to_numeric(df["pct_height_med"], errors="coerce") df["abs_height"] = df["pct_height_med"].abs() if "frac_matched" in df.columns: df["frac_matched"] = pd.to_numeric(df["frac_matched"], errors="coerce") if "delta_snr_db_med" in df.columns: df["delta_snr_db_med"] = pd.to_numeric(df["delta_snr_db_med"], errors="coerce") if "score" in df.columns: df["score"] = pd.to_numeric(df["score"], errors="coerce") if "selection_score" in df.columns: df["selection_score"] = pd.to_numeric(df["selection_score"], errors="coerce") # Apply *identical* row filters everywhere mask = pd.Series(True, index=df.index) if require_pass: if "passes_selection_criteria" in df.columns: mask &= (df["passes_selection_criteria"] == True) # noqa: E712 elif "passes_min_denoise" in df.columns: mask &= (df["passes_min_denoise"] == True) # noqa: E712 if require_finite_metrics: # These are needed for Pareto and usually for quality ranking needed = [] if "abs_height" in df.columns: needed.append(np.isfinite(df["abs_height"])) if "delta_snr_db_med" in df.columns: needed.append(np.isfinite(df["delta_snr_db_med"])) if needed: m = needed[0] for n in needed[1:]: m &= n mask &= m df = df[mask].reset_index(drop=True) if df.empty: raise ValueError("No rows left after normalization/filters") return df def _compute_pareto_mask(df): """ Non-dominated w.r.t minimize x=abs_height and maximize y=delta_snr_db_med. Returns boolean mask of length len(df). """ if not {"abs_height", "delta_snr_db_med"} <= set(df.columns): raise ValueError("Pareto needs abs_height and delta_snr_db_med") x = df["abs_height"].to_numpy() y = df["delta_snr_db_med"].to_numpy() n = len(df) if n == 0: return np.ones(0, dtype=bool) # Vectorized: for each point i, check if any other point j dominates it # j dominates i iff: y[j] >= y[i] AND x[j] <= x[i] AND (y[j] > y[i] OR x[j] < x[i]) # Use broadcasting: (n, 1) vs (1, n) y_ge = y[:, np.newaxis] >= y[np.newaxis, :] # (j, i): y[j] >= y[i] x_le = x[:, np.newaxis] <= x[np.newaxis, :] # (j, i): x[j] <= x[i] strict = (y[:, np.newaxis] > y[np.newaxis, :]) | (x[:, np.newaxis] < x[np.newaxis, :]) dominated_by = y_ge & x_le & strict # (j, i): j dominates i # Zero out self-domination (diagonal) np.fill_diagonal(dominated_by, False) # Point i is non-dominated if no j dominates it nd = ~dominated_by.any(axis=0) return nd
[docs] def select_methods(summary, basis="constrained_pareto_then_snr", top_k=12, require_pass=True, require_finite_metrics=True): """ Returns: filtered_df: the post-filter DataFrame (shared!) frontier_df: DataFrame of Pareto points (or None if basis='score' and Pareto not computed) selected_df: the DataFrame of selected rows to annotate/return (top_k) """ df = _normalize_and_filter(summary, require_pass, require_finite_metrics) frontier_df = None if basis == "score": if "score" not in df.columns: raise ValueError("basis='score' requires a 'score' column") selected_df = df.sort_values(["score", "delta_snr_db_med"], ascending=[True, False]).head(top_k).copy() elif basis in {"constrained_pareto_then_snr", "pareto_then_snr"}: nd_mask = _compute_pareto_mask(df) frontier_df = df[nd_mask].copy() frontier_df = frontier_df.sort_values(["abs_height", "delta_snr_db_med"], ascending=[True, False]) frontier_df = frontier_df.drop_duplicates(subset=["abs_height"], keep="first") order = ["delta_snr_db_med"] ascending = [False] if "frac_matched" in frontier_df.columns: order.append("frac_matched") ascending.append(False) if "selection_score" in frontier_df.columns: order.append("selection_score") ascending.append(True) elif "score" in frontier_df.columns: order.append("score") ascending.append(True) selected_df = frontier_df.sort_values(order, ascending=ascending).head(top_k).copy() elif basis == "pareto_then_score": nd_mask = _compute_pareto_mask(df) frontier_df = df[nd_mask].copy() # Tidy frontier ordering for plotting frontier_df = frontier_df.sort_values(["abs_height", "delta_snr_db_med"], ascending=[True, False]) frontier_df = frontier_df.drop_duplicates(subset=["abs_height"], keep="first") # Pick what to annotate/”best” from inside the frontier if "score" in df.columns: selected_df = frontier_df.sort_values(["score", "delta_snr_db_med"], ascending=[True, False]).head(top_k).copy() else: selected_df = frontier_df.sort_values("delta_snr_db_med", ascending=False).head(top_k).copy() else: raise ValueError( "basis must be one of " "'constrained_pareto_then_snr', 'pareto_then_snr', 'pareto_then_score', or 'score'" ) return df, frontier_df, selected_df
[docs] def plot_pareto_delta_snr_vs_height(summary, annotate=True, top_k=12, out_path=None, ax=None, basis="constrained_pareto_then_snr", require_pass=True, require_finite_metrics=True, save_plot=True, save_pareto=True): """Render ΔSNR vs. |%height| with Pareto annotations. Parameters mirror :func:`select_methods`; see :class:`DenoisingMethods.plot` for additional discussion. The helper creates the Matplotlib figure when ``ax`` is omitted and optionally saves both the chart and frontier table. """ import matplotlib.pyplot as plt relaxed_for_plot = False try: df, frontier_df, selected_df = select_methods( summary, basis=basis, top_k=top_k, require_pass=require_pass, require_finite_metrics=require_finite_metrics ) except ValueError as exc: if require_pass and "No rows left after normalization/filters" in str(exc): df, frontier_df, selected_df = select_methods( summary, basis=basis, top_k=top_k, require_pass=False, require_finite_metrics=require_finite_metrics, ) relaxed_for_plot = True else: raise created_fig = False if ax is None: fig, ax = plt.subplots(figsize=(6, 5)) created_fig = True ax.scatter(df["abs_height"], df["delta_snr_db_med"], alpha=0.6, label="All candidates") if frontier_df is not None: ax.plot(frontier_df["abs_height"], frontier_df["delta_snr_db_med"], linewidth=2, marker="o", label="Pareto front") ax.set_xlabel("|% height change| (median)") ax.set_ylabel("ΔSNR (dB, median)") title = "Pareto: ΔSNR vs. |%height| (lower x, higher y is better)" if relaxed_for_plot: title += "\nNo methods passed the current selection criteria; showing all finite candidates" ax.set_title(title) ax.grid(True, alpha=0.2) ax.legend() if annotate and not selected_df.empty: for _, r in selected_df.iterrows(): ax.annotate(str(r["method"]), (r["abs_height"], r["delta_snr_db_med"]), xytext=(4, 4), textcoords="offset points") if save_plot: out_path = OUTPUT_DIR / f"Pareto_plot_top_{top_k}.pdf" ax.figure.savefig(out_path, bbox_inches="tight") if save_pareto and frontier_df is not None: out_path = OUTPUT_DIR / f"Pareto_front_{top_k}.xlsx" if _POLARS_AVAILABLE and isinstance(frontier_df, pl.DataFrame): frontier_df.write_excel(out_path) else: frontier_df.to_excel(out_path) return ax