Source code for mioXpektron.detection.detection

# Data import function
import logging
import os

import pandas as pd
import numpy as np

logger = logging.getLogger(__name__)
from scipy import signal, optimize, special, integrate
from pybaselines import Baseline

from ..baseline.baseline_base import baseline_correction
from ..denoise.denoise_main import noise_filtering
from ..normalization.normalization import tic_normalization
from ..utils.file_management import _resolve_group
from ..utils.file_management import import_data

_PEAK_RESULT_COLUMNS = [
    "PeakCenter",
    "PeakWidth",
    "Prominences",
    "Amplitude",
    "PeakArea",
    "SampleName",
    "Group",
    "DetectedBy",
    "Deconvoluted",
    "FitModel",
    "AreaDefinition",
    "WidthDefinition",
    "IntegrationMethod",
]


def _empty_peak_properties_df():
    """Return an empty peak table with the canonical result schema."""

    return pd.DataFrame(columns=_PEAK_RESULT_COLUMNS)


def _log_peak_fit_failure_summary(
        sample_name,
        method,
        *,
        single_attempts=0,
        single_failures=0,
        deconv_attempts=0,
        deconv_failures=0,
        ):
    """Emit one warning summarizing analytic fit failures for a spectrum."""
    if single_failures <= 0 and deconv_failures <= 0:
        return

    parts = []
    if single_attempts > 0:
        parts.append(f"{single_failures}/{single_attempts} single-peak fits failed")
    if deconv_attempts > 0:
        parts.append(f"{deconv_failures}/{deconv_attempts} deconvolution fits raised exceptions")

    logger.warning(
        "%s: %s detection skipped some analytic fits (%s)",
        sample_name,
        method,
        "; ".join(parts),
    )


[docs] def handle_missing_values(mz_values, intensities, method="interpolation"): """Fill missing intensity values using the requested strategy.""" missing_indices = np.where(np.isnan(intensities))[0] if missing_indices.size == 0: return mz_values, intensities fixed_intensities = intensities.astype(float, copy=True) if method == "interpolation": valid_indices = np.where(~np.isnan(intensities))[0] if valid_indices.size == 0: fixed_intensities[missing_indices] = 0.0 else: fixed_intensities[missing_indices] = np.interp( mz_values[missing_indices], mz_values[valid_indices], intensities[valid_indices], ) elif method == "zero": fixed_intensities[missing_indices] = 0.0 elif method == "mean": mean_intensity = np.nanmean(intensities) fixed_intensities[missing_indices] = 0.0 if np.isnan(mean_intensity) else mean_intensity else: # pragma: no cover - defensive raise ValueError(f"Unknown method for handling missing values: {method}") return mz_values, fixed_intensities
# Robust Noise Level Estimation Function def _positive_noise_stats(noise_values, fallback_values=None, empty_warning=None): """Return median and Gaussian-equivalent MAD for positive finite values.""" values = np.asarray(noise_values, dtype=float) values = values[np.isfinite(values) & (values > 0)] if values.size == 0 and fallback_values is not None: if empty_warning: logger.warning(empty_warning) values = np.asarray(fallback_values, dtype=float) values = values[np.isfinite(values) & (values > 0)] if values.size == 0: return 0.0, 0.0 median_intensity = np.median(values) mad = np.median(np.abs(values - median_intensity)) # 1.4826 converts MAD to a Gaussian-equivalent sigma. This is a # pragmatic thresholding surrogate, not a Poisson-specific noise model. robust_std = 1.4826 * mad return median_intensity, robust_std def _resolve_noise_peak_regions( intensities, peak_indices=None, peak_height=None, peak_prominence=None, min_peak_width=1, max_peak_width=75, ): """Return peak centers and width bounds used to mask likely signal.""" if peak_indices is not None: peaks = np.asarray(peak_indices, dtype=int) peaks = peaks[(peaks >= 0) & (peaks < len(intensities))] if peaks.size == 0: return peaks, np.array([], dtype=float), np.array([], dtype=float) try: _widths, _height, left_ips, right_ips = signal.peak_widths( intensities, peaks, rel_height=0.5, ) except (ValueError, RuntimeError): left_ips = peaks.astype(float) right_ips = peaks.astype(float) return peaks, np.asarray(left_ips, dtype=float), np.asarray(right_ips, dtype=float) if peak_height is None or peak_prominence is None: pos = intensities[intensities > 0] if pos.size > 0: _med, _mad = _positive_noise_stats(pos) else: _med, _mad = 0.0, 0.0 if peak_height is None: peak_height = _med if peak_prominence is None: peak_prominence = 3.0 * _mad if _mad > 0 else _med peaks, properties = signal.find_peaks( intensities, height=peak_height, prominence=peak_prominence, width=(min_peak_width, max_peak_width), ) left_ips = np.asarray(properties.get("left_ips", peaks), dtype=float) right_ips = np.asarray(properties.get("right_ips", peaks), dtype=float) return np.asarray(peaks, dtype=int), left_ips, right_ips def _noise_exclusion_mask( intensities, peak_indices=None, window=2, peak_height=None, peak_prominence=None, min_peak_width=1, max_peak_width=75, ): """Build a noise mask using measured peak widths plus an extra point margin.""" mask = np.ones_like(intensities, dtype=bool) peak_indices, left_ips, right_ips = _resolve_noise_peak_regions( intensities, peak_indices=peak_indices, peak_height=peak_height, peak_prominence=peak_prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, ) if peak_indices is None or len(peak_indices) == 0: return mask n = len(intensities) lefts = np.maximum(0, np.floor(left_ips).astype(int) - int(window)) rights = np.minimum(n, np.ceil(right_ips).astype(int) + int(window) + 1) for left, right in zip(lefts, rights): if right > left: mask[left:right] = False return mask
[docs] def robust_noise_estimation( intensities, peak_indices=None, window=2, peak_height=None, peak_prominence=None, min_peak_width=1, max_peak_width=75 ): """ Robust noise estimation by excluding regions near detected peaks. Parameters ---------- intensities : np.ndarray Denoised, baseline-corrected intensities. peak_indices : np.ndarray or None Indices of detected peaks. If None, function will detect peaks automatically. window : int Extra number of data points to exclude on each side of the detected peak width. The measured peak extent is always masked first. peak_height : float or None Minimum height for peak detection. If None, defaults to the median of positive intensities (data-adaptive). peak_prominence : float or None Minimum prominence for peak detection. If None, defaults to 3x the MAD of positive intensities (data-adaptive). Returns ------- median_intensity : float Median intensity of noise region. robust_std : float Robust standard deviation (Gaussian-equivalent MAD) of noise region. """ mask = _noise_exclusion_mask( intensities, peak_indices=peak_indices, window=window, peak_height=peak_height, peak_prominence=peak_prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, ) noise_values = intensities[mask] return _positive_noise_stats( noise_values, fallback_values=intensities, empty_warning="No positive noise values found; falling back to full intensity array", )
# Robust Noise Level Estimation Function
[docs] def robust_noise_estimation_mz( mz_values, intensities, min_mz, max_mz ): """ Estimate noise from a user-specified m/z baseline region. Parameters ---------- mz_values : np.ndarray m/z axis. intensities : np.ndarray Corresponding intensity values. min_mz, max_mz : float m/z window that defines the baseline region. Returns ------- median_intensity : float Median intensity of the baseline region. robust_std : float Robust standard deviation (MAD-scaled) of the baseline region. """ baseline_region = intensities[(mz_values >= min_mz) & (mz_values <= max_mz)] median_intensity, robust_std = _positive_noise_stats(baseline_region) if median_intensity == 0.0 and robust_std == 0.0: logger.warning("No positive values in m/z baseline region [%s, %s]", min_mz, max_mz) return 0.0, 0.0 return median_intensity, robust_std
[docs] def robust_noise_estimation_mz_dependent( mz_values, intensities, peak_indices=None, window=2, peak_height=None, peak_prominence=None, min_peak_width=1, max_peak_width=75, n_bins=20, min_points_per_bin=25, ): """Estimate local noise as piecewise-constant m/z bins interpolated over the spectrum. Returns ------- median_profile : np.ndarray Per-point local median noise estimate. std_profile : np.ndarray Per-point local Gaussian-equivalent robust std estimate. """ mz_values = np.asarray(mz_values, dtype=float) intensities = np.asarray(intensities, dtype=float) if mz_values.size != intensities.size: raise ValueError("mz_values and intensities must have the same length") if mz_values.size == 0: return np.array([], dtype=float), np.array([], dtype=float) if n_bins < 1: raise ValueError("n_bins must be >= 1") if min_points_per_bin < 1: raise ValueError("min_points_per_bin must be >= 1") global_median, global_std = robust_noise_estimation( intensities, peak_indices=peak_indices, window=window, peak_height=peak_height, peak_prominence=peak_prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, ) if np.allclose(mz_values.min(), mz_values.max()): return ( np.full_like(intensities, global_median, dtype=float), np.full_like(intensities, global_std, dtype=float), ) mask = _noise_exclusion_mask( intensities, peak_indices=peak_indices, window=window, peak_height=peak_height, peak_prominence=peak_prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, ) edges = np.linspace(mz_values.min(), mz_values.max(), n_bins + 1) centers = [] medians = [] stds = [] for i in range(n_bins): if i == n_bins - 1: bin_mask = (mz_values >= edges[i]) & (mz_values <= edges[i + 1]) else: bin_mask = (mz_values >= edges[i]) & (mz_values < edges[i + 1]) noise_values = intensities[bin_mask & mask] median_i, std_i = _positive_noise_stats(noise_values) if np.count_nonzero(np.isfinite(noise_values) & (noise_values > 0)) < min_points_per_bin: continue centers.append(0.5 * (edges[i] + edges[i + 1])) medians.append(median_i) stds.append(std_i) if not centers: return ( np.full_like(intensities, global_median, dtype=float), np.full_like(intensities, global_std, dtype=float), ) centers = np.asarray(centers, dtype=float) medians = np.asarray(medians, dtype=float) stds = np.asarray(stds, dtype=float) if centers.size == 1: return ( np.full_like(intensities, medians[0], dtype=float), np.full_like(intensities, stds[0], dtype=float), ) median_profile = np.interp(mz_values, centers, medians, left=medians[0], right=medians[-1]) std_profile = np.interp(mz_values, centers, stds, left=stds[0], right=stds[-1]) return median_profile, std_profile
def _height_threshold_from_noise_model( mz_values, intensities, min_snr, noise_model="global", window=2, peak_height=None, peak_prominence=None, min_peak_width=1, max_peak_width=75, noise_bins=20, noise_min_points=25, ): """Return (median_noise, std_noise, height_threshold) for the requested noise model.""" if noise_model == "global": median_noise, std_noise = robust_noise_estimation( intensities, peak_indices=None, window=window, peak_height=peak_height, peak_prominence=peak_prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, ) elif noise_model == "mz_binned": median_noise, std_noise = robust_noise_estimation_mz_dependent( mz_values, intensities, peak_indices=None, window=window, peak_height=peak_height, peak_prominence=peak_prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, n_bins=noise_bins, min_points_per_bin=noise_min_points, ) else: raise ValueError("noise_model must be 'global' or 'mz_binned'") height_thresh = median_noise + min_snr * std_noise return median_noise, std_noise, height_thresh # --------------------------------------------------------------------------- # Shared width / area computation (trapezoid via cumulative sum) # --------------------------------------------------------------------------- def _compute_widths_and_areas(mz_values, intensities, peak_indices, rel_height): """Return (peak_widths_mz, areas) for the given peak indices. Uses cumulative-trapezoid integration, matching the logic formerly duplicated in ``detect_peaks_with_area`` and ``detect_peaks_cwt_with_area``. """ if len(peak_indices) == 0: return np.array([]), np.array([]) results_half = signal.peak_widths(intensities, peak_indices, rel_height=rel_height) left_ips = results_half[2] right_ips = results_half[3] left_bounds = np.clip(np.floor(left_ips).astype(int), 0, len(mz_values) - 1) right_bounds = np.clip(np.ceil(right_ips).astype(int), 0, len(mz_values) - 1) peak_widths = mz_values[right_bounds] - mz_values[left_bounds] dx = np.diff(mz_values) avg_y = 0.5 * (intensities[:-1] + intensities[1:]) cum_area = np.empty(len(mz_values), dtype=float) cum_area[0] = 0.0 np.cumsum(dx * avg_y, out=cum_area[1:]) areas = cum_area[right_bounds] - cum_area[left_bounds] return peak_widths, areas # Detect peak and measure width and area
[docs] def detect_peaks_with_area( mz_values, intensities, sample_name, group, min_intensity=1, min_snr=3, min_distance=2, window_size=10, peak_height=50, prominence=10, min_peak_width=1, max_peak_width=75, width_rel_height=0.5, noise_model="global", noise_bins=20, noise_min_points=25, verbose=False ): """ Fast peak detection in ToF-SIMS or similar spectra, including peak area. Returns: -------- peak_indices : np.ndarray Indices of detected peaks peak_properties : dict Contains: mz, intensities, widths, prominences, heights, areas """ if not np.any(intensities > min_intensity): return _empty_peak_properties_df() # Validate m/z is monotonically increasing for correct area integration if len(mz_values) > 1 and not np.all(np.diff(mz_values) > 0): sort_idx = np.argsort(mz_values) mz_values = mz_values[sort_idx] intensities = intensities[sort_idx] # Clamp sub-threshold intensities to zero but keep the full x-grid so # that peak-width and area estimation see the true signal geometry. clamped = np.where(intensities > min_intensity, intensities, 0.0) if verbose: logger.info(f'min intensity: {np.nanmin(intensities)}, max intensity: {np.nanmax(intensities)}') median_noise, std_noise, height_thresh = _height_threshold_from_noise_model( mz_values, clamped, min_snr, noise_model=noise_model, window=window_size, peak_height=peak_height, peak_prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, noise_bins=noise_bins, noise_min_points=noise_min_points, ) if verbose: if np.ndim(median_noise) == 0: logger.info(f'median_noise: {median_noise}, std_noise: {std_noise}') else: logger.info( 'median_noise range: [%s, %s], std_noise range: [%s, %s]', float(np.nanmin(median_noise)), float(np.nanmax(median_noise)), float(np.nanmin(std_noise)), float(np.nanmax(std_noise)), ) if np.ndim(height_thresh) == 0: logger.info('height threshold: %s', height_thresh) else: logger.info( 'height threshold range: [%s, %s]', float(np.nanmin(height_thresh)), float(np.nanmax(height_thresh)), ) # Detect peaks on clamped signal peaks, props = signal.find_peaks( clamped, height=height_thresh, distance=min_distance, prominence=prominence, width=(min_peak_width, max_peak_width) ) # Calculate FWHM bounds and areas on the *original* intensities so # that tails below min_intensity still contribute to width/area. peak_widths, areas = _compute_widths_and_areas( mz_values, intensities, peaks, width_rel_height ) peak_properties = { 'PeakCenter': mz_values[peaks], 'PeakWidth': peak_widths, 'Prominences': props.get('prominences', None), 'Amplitude': props.get('peak_heights', None), 'PeakArea': areas, } peak_properties = pd.DataFrame(peak_properties) peak_properties['SampleName'] = sample_name peak_properties['Group'] = group peak_properties['DetectedBy'] = 'local_max' peak_properties['Deconvoluted'] = False peak_properties['FitModel'] = 'none' peak_properties['AreaDefinition'] = 'raw_trapezoid' peak_properties['WidthDefinition'] = f'width_at_rel_height={width_rel_height}' peak_properties['IntegrationMethod'] = 'trapezoid' return peak_properties
def _baseline_simpson(mz, y, left_ip, right_ip): """ Integrate y over [left_ip, right_ip] after subtracting a straight-line baseline through the two end-points. Returns 0.0 when the interval is degenerate (left_ip ≈ right_ip). """ if right_ip <= left_ip: return 0.0 sample_idx = np.arange(len(mz), dtype=float) x_left = np.interp(left_ip, sample_idx, mz) x_right = np.interp(right_ip, sample_idx, mz) if x_right <= x_left: return 0.0 y_left = np.interp(left_ip, sample_idx, y) y_right = np.interp(right_ip, sample_idx, y) i0 = int(np.floor(left_ip)) + 1 i1 = int(np.ceil(right_ip)) x_seg = np.concatenate(([x_left], mz[i0:i1], [x_right])) y_seg = np.concatenate(([y_left], y[i0:i1], [y_right])) y_base = np.interp(x_seg, [x_left, x_right], [y_left, y_right]) y_corr = np.clip(y_seg - y_base, 0, None) return integrate.simpson(y=y_corr, x=x_seg)
[docs] def detect_peaks_with_area_v2( mz, intens, sample_name, group, *, min_intensity=1, min_snr=3, min_distance=2, prominence=10, min_peak_width=1, max_peak_width=75, rel_height=0.5, noise_model="global", noise_bins=20, noise_min_points=25, noise_window=10, verbose=False): # 0. basic sanity check mask = intens > min_intensity if not np.any(mask): return _empty_peak_properties_df() mz, intens = mz[mask], intens[mask] # 1. noise estimate median_noise, std_noise, hthr = _height_threshold_from_noise_model( mz, intens, min_snr, noise_model=noise_model, window=noise_window, peak_height=None, peak_prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, noise_bins=noise_bins, noise_min_points=noise_min_points, ) if verbose: if np.ndim(hthr) == 0: logger.info( "median=%.2f, robust_std=%.2f, height threshold=%.2f", median_noise, std_noise, hthr, ) else: logger.info( "median range=[%.2f, %.2f], robust_std range=[%.2f, %.2f], threshold range=[%.2f, %.2f]", float(np.nanmin(median_noise)), float(np.nanmax(median_noise)), float(np.nanmin(std_noise)), float(np.nanmax(std_noise)), float(np.nanmin(hthr)), float(np.nanmax(hthr)), ) # 2. peak picking peaks, props = signal.find_peaks(intens, height=hthr, distance=min_distance, prominence=prominence, width=(min_peak_width, max_peak_width)) if peaks.size == 0: return _empty_peak_properties_df() # 3. sub-sample widths (FWHM by default) widths, h_eval, left_ips, right_ips = signal.peak_widths( intens, peaks, rel_height=rel_height) # 4. area under each peak (baseline-corrected Simpson) areas = np.fromiter((_baseline_simpson(mz, intens, l, r) for l, r in zip(left_ips, right_ips)), dtype=float, count=peaks.size) # 5. width in m/z units (sub-sample) mz_interp = np.interp widths_mz = mz_interp(right_ips, np.arange(mz.size), mz) - \ mz_interp(left_ips, np.arange(mz.size), mz) df = pd.DataFrame({ 'PeakCenter' : mz[peaks], 'PeakWidth' : widths_mz, 'Prominences' : props['prominences'], 'Amplitude' : props['peak_heights'], 'PeakArea' : areas, 'SampleName' : sample_name, 'Group' : group, 'DetectedBy' : 'find_peaks+widths', 'Deconvoluted' : False, 'FitModel' : 'none', 'AreaDefinition' : 'baseline_corrected_simpson', 'WidthDefinition' : f'width_at_rel_height={rel_height}', 'IntegrationMethod': 'simpson', }) return df
[docs] def detect_peaks_cwt_with_area( mz_values, intensities, sample_name, group, min_intensity=1, min_snr=3, min_distance=2, window_size=10, peak_height=50, prominence=10, min_peak_width=1, max_peak_width=75, width_rel_height=0.5, noise_model="global", noise_bins=20, noise_min_points=25, verbose=False ): """ Peak detection using Continuous Wavelet Transform (CWT) for ToF-SIMS spectra. Returns: -------- peak_properties : pd.DataFrame Contains: mz, intensities, widths (approx), amplitudes, areas """ if not np.any(intensities > min_intensity): return _empty_peak_properties_df() # Clamp sub-threshold intensities but keep the full x-grid clamped = np.where(intensities > min_intensity, intensities, 0.0) if verbose: logger.info(f'min intensity: {np.nanmin(intensities)}, max intensity: {np.nanmax(intensities)}') median_noise, std_noise, height_thresh = _height_threshold_from_noise_model( mz_values, clamped, min_snr, noise_model=noise_model, window=window_size, peak_height=peak_height, peak_prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, noise_bins=noise_bins, noise_min_points=noise_min_points, ) if verbose: if np.ndim(median_noise) == 0: logger.info(f'median noise: {median_noise}, std noise: {std_noise}') else: logger.info( 'median noise range: [%s, %s], std noise range: [%s, %s]', float(np.nanmin(median_noise)), float(np.nanmax(median_noise)), float(np.nanmin(std_noise)), float(np.nanmax(std_noise)), ) if np.ndim(height_thresh) == 0: logger.info('height threshold: %s', height_thresh) else: logger.info( 'height threshold range: [%s, %s]', float(np.nanmin(height_thresh)), float(np.nanmax(height_thresh)), ) # Prepare widths array for CWT (must be integers) widths = np.arange(min_peak_width, max_peak_width + 1) # CWT peak detection on clamped signal cwt_peaks = signal.find_peaks_cwt(clamped, widths, min_snr=min_snr) # Only retain peaks above the estimated SNR threshold cwt_peaks = [ idx for idx in cwt_peaks if clamped[idx] > (height_thresh[idx] if np.ndim(height_thresh) > 0 else height_thresh) ] # Estimate peak properties using original intensities peak_centers = mz_values[cwt_peaks] amplitudes = intensities[cwt_peaks] # Estimate peak widths and areas on original intensities cwt_peaks_arr = np.asarray(cwt_peaks, dtype=int) peak_widths, peak_areas = _compute_widths_and_areas( mz_values, intensities, cwt_peaks_arr, width_rel_height ) # Build DataFrame peak_properties = pd.DataFrame({ 'PeakCenter': peak_centers, 'PeakWidth': peak_widths, 'Prominences': [np.nan] * len(cwt_peaks), 'Amplitude': amplitudes, 'PeakArea': peak_areas, }) peak_properties['SampleName'] = sample_name peak_properties['Group'] = group peak_properties['DetectedBy'] = 'cwt' peak_properties['Deconvoluted'] = False peak_properties['FitModel'] = 'none' peak_properties['AreaDefinition'] = 'raw_trapezoid' peak_properties['WidthDefinition'] = f'width_at_rel_height={width_rel_height}' peak_properties['IntegrationMethod'] = 'trapezoid' return peak_properties
# --------------------------------------------------------------------------- # Shared metric helpers # --------------------------------------------------------------------------- _FWHM_SIGMA = 2.0 * np.sqrt(2.0 * np.log(2.0)) # ≈ 2.35482 (sigma → FWHM) def _voigt_fwhm(sigma, gamma): """ Standard pseudo-Voigt FWHM approximation (Olivero & Longbothum, 1977). Parameters ---------- sigma : float — Gaussian standard deviation gamma : float — Lorentzian HWHM Returns ------- float : Voigt FWHM """ f_G = _FWHM_SIGMA * abs(sigma) # Gaussian FWHM f_L = 2.0 * abs(gamma) # Lorentzian FWHM return 0.5346 * f_L + np.sqrt(0.2166 * f_L**2 + f_G**2) # --------------------------------------------------------------------------- # Curve fittings # Gaussian model for curve fitting
[docs] def gaussian(x, amp, cen, sigma): """Gaussian lineshape function. amp: Peak height at *cen* cen: Peak centre (mean) sigma: standard deviation sigma of the Gaussian returns: Gaussian function evaluated at x. """ return amp * np.exp(-(x - cen)**2 / (2 * sigma**2))
# Lorentzian model for curve fitting
[docs] def lorentzian(x, A, x0, gamma): """Lorentzian lineshape function. A: area under the peak (scaling factor) x0: center gamma: half-width at half-maximum (HWHM) """ return (A / np.pi) * (gamma / ((x - x0)**2 + gamma**2))
# Voigt model for curve fitting
[docs] def voigt(x, A, x0, sigma, gamma): """ Voigt profile (convolution of Gaussian and Lorentzian). A: area under the peak (scaling factor) x0: center sigma: standard deviation of Gaussian component gamma: HWHM of Lorentzian component """ # z = ((x - x0) + 1j*gamma) / (sigma * np.sqrt(2)) # v = A * np.real(special.wofz(z)) / (sigma * np.sqrt(2*np.pi)) return A * special.voigt_profile(x - x0, sigma, gamma)
# Two Gaussian model
[docs] def two_gaussians(x, amp1, cen1, wid1, amp2, cen2, wid2): return (amp1 * np.exp(-(x - cen1)**2 / (2*wid1**2)) + amp2 * np.exp(-(x - cen2)**2 / (2*wid2**2)))
def _median_positive_spacing(x): """Return the median positive x spacing or NaN when undefined.""" x = np.asarray(x, dtype=float) if x.size < 2: return np.nan diffs = np.diff(x) pos = diffs[np.isfinite(diffs) & (diffs > 0)] if pos.size == 0: return np.nan return float(np.median(pos)) def _rss(y_true, y_pred): """Residual sum of squares with finite-value guarding.""" resid = np.asarray(y_true, dtype=float) - np.asarray(y_pred, dtype=float) resid = resid[np.isfinite(resid)] if resid.size == 0: return np.inf return float(np.sum(resid * resid)) def _bic_score(y_true, y_pred, n_params): """Bayesian information criterion for least-squares peak models.""" n_obs = int(np.count_nonzero(np.isfinite(y_true) & np.isfinite(y_pred))) if n_obs <= max(int(n_params), 1): return np.inf rss = max(_rss(y_true, y_pred), np.finfo(float).tiny) return float(n_obs * np.log(rss / n_obs) + n_params * np.log(n_obs)) def _fwhm_points(width_mz, x_fit): """Convert an m/z-domain FWHM to approximate sample points.""" dx = _median_positive_spacing(x_fit) if not np.isfinite(dx) or dx <= 0: return np.nan return float(width_mz / dx) def _component_widths_are_reasonable(sigmas, x_fit, min_peak_width, max_peak_width): """Check fitted Gaussian component widths against detector width limits.""" sigmas = np.asarray(sigmas, dtype=float) if sigmas.size == 0 or not np.all(np.isfinite(sigmas)) or np.any(sigmas <= 0): return False fwhm_pts = np.array( [_fwhm_points(abs(sigma) * _FWHM_SIGMA, x_fit) for sigma in sigmas], dtype=float, ) if not np.all(np.isfinite(fwhm_pts)): return False min_width = max(float(min_peak_width), 1.0) max_width = max(float(max_peak_width), min_width) return bool(np.all((fwhm_pts >= min_width) & (fwhm_pts <= max_width))) def _fit_single_gaussian_window(x_fit, y_fit, center_hint, min_peak_width, max_peak_width): """Fit a single Gaussian to the deconvolution window for BIC comparison.""" dx = _median_positive_spacing(x_fit) sigma_floor = 1e-9 sigma_guess = max(np.ptp(x_fit) / (6.0 * _FWHM_SIGMA), sigma_floor) sigma_lower = sigma_floor sigma_upper = max(np.ptp(x_fit), sigma_guess * 10.0, sigma_floor * 10.0) if np.isfinite(dx) and dx > 0: sigma_lower = max((max(float(min_peak_width), 1.0) * dx) / _FWHM_SIGMA, sigma_floor) sigma_upper = max((max(float(max_peak_width), float(min_peak_width)) * dx) / _FWHM_SIGMA, sigma_lower * 1.01) sigma_guess = min(max(sigma_guess, sigma_lower), sigma_upper) amp_guess = max(float(np.nanmax(y_fit)), 1e-9) p0 = [amp_guess, float(center_hint), sigma_guess] bounds = ( [1e-9, float(np.min(x_fit)), sigma_lower], [np.inf, float(np.max(x_fit)), sigma_upper], ) return optimize.curve_fit( gaussian, x_fit, y_fit, p0=p0, maxfev=20000, bounds=bounds, )[0]
[docs] def robust_peak_detection( mz_values, intensities, sample_name, group, method='Gaussian', min_intensity=1, min_snr=3, min_distance=2, window_size=10, peak_height=50, prominence=10, min_peak_width=1, max_peak_width=75, width_rel_height=0.5, distance_threshold=0.1, combined=False, use_cwt=False, noise_model="global", noise_bins=20, noise_min_points=25, deconvolution_min_bic_delta=10.0, deconvolution_overlap_factor=0.75, deconvolution_replace_singles=True, verbose=False ): """ Fast peak detection in ToF-SIMS or similar spectra, including peak area. Returns: -------- peak_indices : np.ndarray Indices of detected peaks peak_properties : dict Contains: mz, intensities, widths, prominences, heights, areas Notes ----- Overlapping-peak deconvolution now requires both geometric overlap and a BIC improvement over a single-Gaussian window fit. Fitted component widths must also remain within the user-specified peak-width bounds. """ if not np.any(intensities > min_intensity): return _empty_peak_properties_df() # Clamp sub-threshold intensities but keep the full x-grid clamped = np.where(intensities > min_intensity, intensities, 0.0) if verbose: logger.info(f'min intensity: {np.nanmin(intensities)}, max intensity: {np.nanmax(intensities)}') median_noise, std_noise, height_thresh = _height_threshold_from_noise_model( mz_values, clamped, min_snr, noise_model=noise_model, window=window_size, peak_height=peak_height, peak_prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, noise_bins=noise_bins, noise_min_points=noise_min_points, ) if verbose: if np.ndim(median_noise) == 0: logger.info(f'median noise: {median_noise}, std noise: {std_noise}') else: logger.info( 'median noise range: [%s, %s], std noise range: [%s, %s]', float(np.nanmin(median_noise)), float(np.nanmax(median_noise)), float(np.nanmin(std_noise)), float(np.nanmax(std_noise)), ) def _find_local_peaks(): local_peaks, _props = signal.find_peaks( clamped, height=height_thresh, distance=min_distance, prominence=prominence, width=(min_peak_width, max_peak_width) ) return np.asarray(local_peaks, dtype=int) def _find_cwt_peaks(): cwt_peaks = signal.find_peaks_cwt( clamped, widths=np.arange(min_peak_width, max_peak_width+1), min_snr=min_snr ) cwt_peaks = np.asarray(cwt_peaks, dtype=int) if cwt_peaks.size == 0: return cwt_peaks cwt_peaks = cwt_peaks[(cwt_peaks >= 0) & (cwt_peaks < len(clamped))] if np.ndim(height_thresh) == 0: keep = clamped[cwt_peaks] > height_thresh else: keep = clamped[cwt_peaks] > height_thresh[cwt_peaks] return np.unique(cwt_peaks[keep]) peaks_local = np.array([], dtype=int) peaks_cwt = np.array([], dtype=int) # find peaks if combined: peaks_local = _find_local_peaks() peaks_cwt = _find_cwt_peaks() combined_indices = np.unique(np.concatenate([peaks_local, peaks_cwt])) elif use_cwt: peaks_cwt = _find_cwt_peaks() combined_indices = peaks_cwt.copy() else: peaks_local = _find_local_peaks() combined_indices = peaks_local.copy() peaks_local_set = set(peaks_local.tolist()) peaks_cwt_set = set(peaks_cwt.tolist()) if verbose: if np.ndim(height_thresh) == 0: logger.info(f"Height threshold: {height_thresh}") else: logger.info( "Height threshold range: [%s, %s]", float(np.nanmin(height_thresh)), float(np.nanmax(height_thresh)), ) if combined: logger.info(f"Local peaks found: {len(peaks_local)} at indices {peaks_local}") logger.info(f"CWT peaks found: {len(peaks_cwt)} at indices {peaks_cwt}") logger.info(f"Combined peaks used for fitting: {len(combined_indices)} at indices {combined_indices}") elif not use_cwt: logger.info(f"Local peaks found: {len(peaks_local)} at indices {peaks_local}") else: logger.info(f"CWT peaks found: {len(peaks_cwt)} at indices {peaks_cwt}") peak_width_mz_map = {} if combined_indices.size > 0: try: _, _, left_ips, right_ips = signal.peak_widths( clamped, combined_indices, rel_height=width_rel_height, ) width_mz = ( np.interp(right_ips, np.arange(len(mz_values)), mz_values) - np.interp(left_ips, np.arange(len(mz_values)), mz_values) ) peak_width_mz_map = { int(idx): float(width) for idx, width in zip(combined_indices, width_mz) if np.isfinite(width) and width > 0 } except (ValueError, RuntimeError): peak_width_mz_map = {} single_fit_attempts = int(len(combined_indices)) single_fit_failures = 0 deconv_fit_attempts = 0 deconv_fit_failures = 0 all_peak_records = [] all_deconv_records = [] deconvolved_source_indices = set() def get_two_peak_guess(idx1, idx2, mz_values, intensities): return [ intensities[idx1], mz_values[idx1], 0.2, intensities[idx2], mz_values[idx2], 0.2 ] # -- Single Peak Fitting -- for idx in combined_indices: left = max(0, idx - window_size) right = min(len(mz_values), idx + window_size + 1) x_fit = mz_values[left:right] y_fit = intensities[left:right] try: if method == 'Gaussian': popt, _ = optimize.curve_fit( gaussian, x_fit, y_fit, p0=[intensities[idx], mz_values[idx], 0.0002], maxfev=20000, bounds=([1e-9, x_fit.min(), 0], [np.inf, x_fit.max(), np.inf]) ) amp = popt[0] cen = popt[1] fwhm = abs(popt[2]) * 2.35482 # Convert sigma to FWHM, sigma = FWHM / 2 * sqrt(2 * ln2) area = popt[0] * abs(popt[2]) * np.sqrt(2 * np.pi) elif method =='Lorentzian': popt, _ = optimize.curve_fit( lorentzian, x_fit, y_fit, p0=[intensities[idx] * np.pi * 0.0001, mz_values[idx], 0.0001], maxfev=20000, bounds=([1e-9, x_fit.min(), 1e-9], [np.inf, x_fit.max(), np.inf]), method='trf', loss='soft_l1' ) area = popt[0] amp = popt[0] / (np.pi * popt[2]) # Convert area to amplitude cen = popt[1] fwhm = popt[2] * 2 # Convert HWHM to FWHM elif method == 'Voigt': popt, _ = optimize.curve_fit( voigt, x_fit, y_fit, p0=[intensities[idx] * np.pi * 0.0001, mz_values[idx], 0.0002, 0.0001], maxfev=20000, bounds=([1e-9, x_fit.min(), 1e-9, 1e-9], [np.inf, x_fit.max(), np.inf, np.inf]) ) area = popt[0] # A is the analytic area of the Voigt profile amp = popt[0] / (np.sqrt(2 * np.pi) * popt[2]) # Convert area to amplitude cen = popt[1] fwhm = _voigt_fwhm(popt[2], popt[3]) # Olivero & Longbothum approximation else: raise ValueError(f"Unknown fitting method: {method}") detected_by = [] if idx in peaks_local_set: detected_by.append("local_max") if idx in peaks_cwt_set: detected_by.append("cwt") all_peak_records.append({ "PeakCenter": cen, "PeakArea": area, "Prominences": np.nan, "Amplitude": amp, "PeakWidth": fwhm, "DetectedBy": "+".join(detected_by), "Deconvoluted": False, "FitModel": method, "AreaDefinition": "analytic_fit", "WidthDefinition": "FWHM", "IntegrationMethod": "analytic", "_SourceIndex": int(idx), }) except Exception as e: single_fit_failures += 1 if verbose: logger.debug(f"Single peak fit failed at idx={idx}: {e}") continue # -- Multi-Peak Deconvolution -- for i in range(len(combined_indices) - 1): idx1 = combined_indices[i] idx2 = combined_indices[i + 1] # Only consider if both peaks are above threshold if clamped[idx1] < 2*height_thresh or clamped[idx2] < 2*height_thresh: continue spacing = abs(mz_values[idx2] - mz_values[idx1]) width_candidates = [peak_width_mz_map.get(int(idx1)), peak_width_mz_map.get(int(idx2))] finite_widths = [width for width in width_candidates if width is not None and np.isfinite(width) and width > 0] adaptive_threshold = np.nan if finite_widths: adaptive_threshold = float(deconvolution_overlap_factor * np.mean(finite_widths)) if distance_threshold is None: effective_threshold = adaptive_threshold elif np.isfinite(adaptive_threshold): effective_threshold = max(float(distance_threshold), adaptive_threshold) else: effective_threshold = float(distance_threshold) if np.isfinite(effective_threshold) and spacing < effective_threshold: left = max(0, idx1 - window_size) right = min(len(mz_values), idx2 + window_size + 1) x_fit = mz_values[left:right] y_fit = intensities[left:right] guess = get_two_peak_guess(idx1, idx2, mz_values, intensities) deconv_fit_attempts += 1 try: popt, _ = optimize.curve_fit(two_gaussians, x_fit, y_fit, p0=guess, maxfev=20000, bounds=( [1e-9, x_fit.min(), 0, 1e-9, x_fit.min(), 0], [np.inf, x_fit.max(), np.inf, np.inf, x_fit.max(), np.inf]) ) amp1, cen1, wid1, amp2, cen2, wid2 = popt components = sorted( [ (float(amp1), float(cen1), float(wid1)), (float(amp2), float(cen2), float(wid2)), ], key=lambda item: item[1], ) amp1, cen1, wid1 = components[0] amp2, cen2, wid2 = components[1] if not _component_widths_are_reasonable( [wid1, wid2], x_fit, min_peak_width, max_peak_width, ): if verbose: logger.debug( "Rejected deconvolution at idx1=%s, idx2=%s due to implausible widths", idx1, idx2, ) continue dominant_idx = idx1 if intensities[idx1] >= intensities[idx2] else idx2 single_params = _fit_single_gaussian_window( x_fit, y_fit, mz_values[dominant_idx], min_peak_width, max_peak_width, ) bic_single = _bic_score(y_fit, gaussian(x_fit, *single_params), n_params=3) bic_double = _bic_score( y_fit, two_gaussians(x_fit, amp1, cen1, wid1, amp2, cen2, wid2), n_params=6, ) if not (bic_double < (bic_single - float(deconvolution_min_bic_delta))): if verbose: logger.debug( "Rejected deconvolution at idx1=%s, idx2=%s because BIC improvement was insufficient " "(single=%.3f, double=%.3f)", idx1, idx2, bic_single, bic_double, ) continue area1 = amp1 * abs(wid1) * np.sqrt(2 * np.pi) area2 = amp2 * abs(wid2) * np.sqrt(2 * np.pi) deconvolved_source_indices.update({int(idx1), int(idx2)}) all_deconv_records.extend([ { "PeakCenter": cen1, "PeakArea": area1, "Prominences": np.nan, "Amplitude": amp1, "PeakWidth": abs(wid1) * _FWHM_SIGMA, # was wid*sqrt(2π) — wrong "DetectedBy": "deconv", "Deconvoluted": True, "FitModel": "two_gaussian", "AreaDefinition": "analytic_fit", "WidthDefinition": "FWHM", "IntegrationMethod": "analytic", }, { "PeakCenter": cen2, "PeakArea": area2, "Prominences": np.nan, "Amplitude": amp2, "PeakWidth": abs(wid2) * _FWHM_SIGMA, # was wid*sqrt(2π) — wrong "DetectedBy": "deconv", "Deconvoluted": True, "FitModel": "two_gaussian", "AreaDefinition": "analytic_fit", "WidthDefinition": "FWHM", "IntegrationMethod": "analytic", } ]) except Exception as e: deconv_fit_failures += 1 if verbose: logger.debug(f"Two-peak fit failed at idx1={idx1}, idx2={idx2}: {e}") continue if deconvolution_replace_singles and deconvolved_source_indices: all_peak_records = [ record for record in all_peak_records if int(record.get("_SourceIndex", -1)) not in deconvolved_source_indices ] if not all_peak_records and not all_deconv_records: peak_properties = _empty_peak_properties_df() else: peak_properties = pd.DataFrame(all_peak_records + all_deconv_records) if "_SourceIndex" in peak_properties.columns: peak_properties = peak_properties.drop(columns=["_SourceIndex"]) peak_properties = peak_properties.drop_duplicates(subset=["PeakCenter", "DetectedBy"], keep='last') peak_properties['SampleName']=sample_name peak_properties['Group']=group _log_peak_fit_failure_summary( sample_name, method, single_attempts=single_fit_attempts, single_failures=single_fit_failures, deconv_attempts=deconv_fit_attempts, deconv_failures=deconv_fit_failures, ) return peak_properties
# Process all peaks to get peak properties
[docs] def collect_peak_properties_batch( files, mz_min=None, mz_max=None, baseline_method='airpls', noise_method='wavelet', missing_value_method='interpolation', normalization_target=1e8, method='Gaussian', min_intensity=1, min_snr=3, min_distance=5, window_size=10, peak_height=50, prominence=50, min_peak_width=1, max_peak_width=75, width_rel_height=0.5, distance_threshold=0.01, combined=False, noise_model="global", noise_bins=20, noise_min_points=25, deconvolution_min_bic_delta=10.0, deconvolution_overlap_factor=0.75, deconvolution_replace_singles=True, ): """ Collect peak properties from a batch of ToF-SIMS files. Parameters ---------- files : list of str List of file paths to process. mz_min, mz_max : float or None m/z window for data import (if supported). baseline_method : str Method for baseline correction. noise_method : str Noise filtering method. missing_value_method : str Method for handling missing values. normalization_target : float Target TIC normalization value. min_snr : int or float Minimum signal-to-noise ratio for peak detection. min_distance : int Minimum distance between peaks (in data points). prominence : int or float or None Minimum peak prominence for detection. width_rel_height : float Relative height for width calculation (e.g., 0.5 = FWHM). noise_model : {"global", "mz_binned"} Noise model used to derive peak thresholds. noise_bins : int Number of m/z bins for ``noise_model="mz_binned"``. noise_min_points : int Minimum positive noise points per bin before using local estimates. Returns ------- peaks_df : pd.DataFrame DataFrame with all peak properties for all files. """ all_peak_records = [] for file_path in files: try: # Import data mz, intensity, sample_name, group = import_data(file_path, mz_min, mz_max) logger.info(f"Processing Sample: {sample_name}, Group: {group}") # Baseline correction intensity_base = baseline_correction(intensity, method=baseline_method) logger.debug("Baseline corrected intensities: available") # Noise filtering intensity_base_noise = noise_filtering(intensity_base, method=noise_method) logger.debug("Noise filtered intensities: available") # Handle missing values _, intensity_base_noise_mis = handle_missing_values(mz, intensity_base_noise, method=missing_value_method) logger.debug("Missing values handled: yes") # TIC normalization intensity_base_noise_mis_norm = tic_normalization(intensity_base_noise_mis, target_tic=normalization_target) logger.debug("TIC normalized intensities: available") # Peak detection if method is None: # print("Performing peak detection using local maxima!") peak_properties = detect_peaks_with_area( mz_values=mz, intensities=intensity_base_noise_mis_norm, sample_name=sample_name, group=group, min_intensity=min_intensity, min_snr=min_snr, min_distance=min_distance, window_size=window_size, peak_height=peak_height, prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, width_rel_height=width_rel_height, noise_model=noise_model, noise_bins=noise_bins, noise_min_points=noise_min_points, ) elif method == 'cwt': # print("Performing peak detection using Continuous Wavelet Transform (CWT)!") peak_properties = detect_peaks_cwt_with_area( mz_values=mz, intensities=intensity_base_noise_mis_norm, sample_name=sample_name, group=group, min_intensity=min_intensity, min_snr=min_snr, min_distance=min_distance, window_size=window_size, peak_height=peak_height, prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, width_rel_height=width_rel_height, noise_model=noise_model, noise_bins=noise_bins, noise_min_points=noise_min_points, ) else: # print(f"Performing peak detection using {method} method!") peak_properties = robust_peak_detection( mz_values=mz, intensities=intensity_base_noise_mis_norm, sample_name=sample_name, group=group, method=method, min_intensity=min_intensity, min_snr=min_snr, min_distance=min_distance, window_size=window_size, peak_height=peak_height, prominence=prominence, min_peak_width=min_peak_width, max_peak_width=max_peak_width, width_rel_height=width_rel_height, distance_threshold=distance_threshold, combined=combined, noise_model=noise_model, noise_bins=noise_bins, noise_min_points=noise_min_points, deconvolution_min_bic_delta=deconvolution_min_bic_delta, deconvolution_overlap_factor=deconvolution_overlap_factor, deconvolution_replace_singles=deconvolution_replace_singles, ) # pd.DataFrame(peak_properties).to_csv(file_path+'.txt', sep='\t', index=False) # Collect properties — vectorized DataFrame append if isinstance(peak_properties, pd.DataFrame) and len(peak_properties) > 0: cols_to_keep = [ 'SampleName', 'Group', 'PeakCenter', 'PeakWidth', 'Prominences', 'PeakArea', 'Amplitude', 'DetectedBy', 'Deconvoluted', 'FitModel', 'AreaDefinition', 'WidthDefinition', 'IntegrationMethod', ] existing_cols = [c for c in cols_to_keep if c in peak_properties.columns] batch = peak_properties[existing_cols].copy() batch['SampleName'] = batch['SampleName'].astype(str) batch['Group'] = batch['Group'].astype(str) batch['DetectedBy'] = batch['DetectedBy'].astype(str) batch['Deconvoluted'] = batch['Deconvoluted'].astype(str) all_peak_records.extend(batch.to_dict('records')) except Exception as e: logger.error(f"Error processing file {file_path}: {e}") peaks_df = pd.DataFrame(all_peak_records) return peaks_df
def _greedy_mz_bins(mz_array: np.ndarray, tolerance: float) -> np.ndarray: """Assign m/z values to bins using a greedy sorted-bin algorithm. Guarantees that ``max(mz) - min(mz) <= tolerance`` within every bin, unlike DBSCAN which only bounds pairwise neighbour distance and can chain peaks into arbitrarily wide clusters. Parameters ---------- mz_array : np.ndarray 1-D array of m/z values (one per peak row). tolerance : float Hard upper bound on the span of each bin. Returns ------- np.ndarray Integer bin labels aligned with *mz_array*. """ order = np.argsort(mz_array) labels = np.empty(len(mz_array), dtype=int) bin_id = 0 bin_start = mz_array[order[0]] for idx in order: if mz_array[idx] - bin_start > tolerance: bin_id += 1 bin_start = mz_array[idx] labels[idx] = bin_id return labels
[docs] def align_peaks( peaks_df, mz_tolerance=0.2, mz_rounding_precision=1, output="intensity", ): """Cluster peaks by m/z and return an aligned feature matrix. Uses a greedy sorted-bin algorithm that guarantees every aligned bin spans at most *mz_tolerance* in m/z. """ if "PeakCenter" not in peaks_df.columns: raise ValueError("peaks_df must contain 'PeakCenter' column") if peaks_df.empty: return pd.DataFrame() working = peaks_df.copy() mz_values = working["PeakCenter"].to_numpy() # Greedy sorted bins — hard tolerance guarantee working["ClusterLabels"] = _greedy_mz_bins(mz_values, mz_tolerance) cluster_centers = ( working.groupby("ClusterLabels", as_index=False)["PeakCenter"].mean() .rename(columns={"PeakCenter": "AlignedPeakCenter"}) ) working = working.merge(cluster_centers, on="ClusterLabels", how="left") working["AlignedPeakCenterRounded"] = ( working["AlignedPeakCenter"].round(mz_rounding_precision).astype(str) ) output_lower = output.lower() if output_lower == "intensity": value_col = "Amplitude" elif output_lower == "area": value_col = "PeakArea" if "AreaDefinition" not in working.columns: raise ValueError( "peaks_df must contain 'AreaDefinition' when output='area'" ) area_definitions = sorted( { str(value) for value in working["AreaDefinition"].dropna().tolist() if str(value).strip() } ) if not area_definitions: raise ValueError( "No non-empty AreaDefinition values found for area alignment" ) if len(area_definitions) > 1: logger.warning( "Aligning PeakArea across mixed AreaDefinition values: %s", ", ".join(area_definitions), ) else: raise ValueError("Output must be 'intensity' or 'area'") if value_col not in working.columns: raise ValueError(f"Column '{value_col}' required for output='{output}' is missing") # Detect duplicate peaks (same sample, same bin) and warn dup_mask = working.duplicated( subset=["SampleName", "AlignedPeakCenterRounded"], keep=False ) n_dups = dup_mask.sum() if n_dups > 0: n_bins_affected = working.loc[ dup_mask, "AlignedPeakCenterRounded" ].nunique() logger.warning( "%d peaks across %d bins have >1 peak per sample per bin; " "keeping the strongest peak in each conflict.", n_dups, n_bins_affected, ) index_cols = ["SampleName"] if "Group" in working.columns: index_cols.append("Group") pivot = ( working.pivot_table( index=index_cols, columns="AlignedPeakCenterRounded", values=value_col, aggfunc="max", ) .fillna(0) .sort_index() ) pivot = pivot.reindex(sorted(pivot.columns, key=lambda x: float(x)), axis=1) return pivot
[docs] class PeakAlignIntensityArea: """ Process normalized ToF-SIMS spectra from CSV files, detect peaks, align them across samples, and calculate both intensity and area tables for each aligned m/z value. Parameters ---------- mz_tolerance : float, optional (default=0.2) Maximum distance (in m/z units) for clustering peaks across samples. mz_rounding_precision : int, optional (default=1) Number of decimal places for rounding aligned m/z values in output tables. min_intensity : float, optional (default=1) Minimum intensity threshold for considering data points. min_snr : float, optional (default=3) Minimum signal-to-noise ratio for peak detection. min_distance : int, optional (default=2) Minimum distance (in data points) between peaks. peak_height : float, optional (default=50) Minimum peak height for initial peak detection. prominence : float, optional (default=10) Minimum prominence for peak detection. min_peak_width : int, optional (default=1) Minimum peak width (in data points). max_peak_width : int, optional (default=75) Maximum peak width (in data points). width_rel_height : float, optional (default=0.5) Relative height for peak width calculation (0.5 = FWHM). noise_model : {"global", "mz_binned"}, optional (default="global") Noise model used for threshold estimation. noise_bins : int, optional (default=20) Number of m/z bins when using ``noise_model="mz_binned"``. noise_min_points : int, optional (default=25) Minimum positive noise points per bin for the local model. method : str or None, optional (default=None) Peak-detection / fitting method. None uses simple local-max detection (``detect_peaks_with_area_v2``), ``'cwt'`` uses CWT detection, and ``'Gaussian'`` / ``'Lorentzian'`` / ``'Voigt'`` use curve-fit detection via ``robust_peak_detection``. deconvolution_min_bic_delta : float, optional (default=10.0) Minimum BIC improvement required before accepting a two-Gaussian deconvolution over a single-peak fit. deconvolution_overlap_factor : float, optional (default=0.75) Scale factor applied to the mean measured peak width when deriving the adaptive deconvolution spacing gate. deconvolution_replace_singles : bool, optional (default=True) If True, replace overlapping single-peak fits with the accepted deconvoluted components in the output table. output_dir : str or None, optional Directory to save output CSV files. If None, files are not saved. verbose : bool, optional (default=False) If True, print progress information. Examples -------- >>> from mioXpektron.detection import PeakAlignIntensityArea >>> import glob >>> >>> # Get all normalized spectra >>> csv_files = glob.glob('output_files/normalized_spectra/*.csv') >>> >>> # Create analyzer instance >>> analyzer = PeakAlignIntensityArea( ... mz_tolerance=0.1, ... min_snr=3, ... output_dir='output_files/peak_analysis' ... ) >>> >>> # Process with m/z cutoff >>> intensity_table, area_table, peaks_df = analyzer.run( ... csv_files, ... mz_min=50, ... mz_max=500 ... ) >>> >>> print(f"Detected {len(peaks_df)} peaks across {len(csv_files)} samples") >>> print(f"Aligned to {intensity_table.shape[1]} unique m/z values") """
[docs] def __init__( self, mz_tolerance=0.2, mz_rounding_precision=1, min_intensity=1, min_snr=3, min_distance=2, peak_height=50, prominence=10, min_peak_width=1, max_peak_width=75, width_rel_height=0.5, noise_model="global", noise_bins=20, noise_min_points=25, method=None, deconvolution_min_bic_delta=10.0, deconvolution_overlap_factor=0.75, deconvolution_replace_singles=True, output_dir=None, verbose=False, group_patterns=None, group_fn=None, ): """Initialize the PeakAlignIntensityArea analyzer with default parameters.""" self.mz_tolerance = mz_tolerance self.mz_rounding_precision = mz_rounding_precision self.min_intensity = min_intensity self.min_snr = min_snr self.min_distance = min_distance self.peak_height = peak_height self.prominence = prominence self.min_peak_width = min_peak_width self.max_peak_width = max_peak_width self.width_rel_height = width_rel_height self.noise_model = noise_model self.noise_bins = noise_bins self.noise_min_points = noise_min_points self.method = method self.deconvolution_min_bic_delta = deconvolution_min_bic_delta self.deconvolution_overlap_factor = deconvolution_overlap_factor self.deconvolution_replace_singles = deconvolution_replace_singles self.output_dir = output_dir self.verbose = verbose self.group_patterns = group_patterns self.group_fn = group_fn
[docs] def run(self, csv_files, mz_min=None, mz_max=None): """ Process CSV files and perform peak detection, alignment, and quantification. Parameters ---------- csv_files : list of str List of paths to normalized spectrum CSV files. Each CSV should have columns: 'channel', 'mz', 'intensity' mz_min : float or None, optional Minimum m/z value to consider for peak detection. If None, use full range. mz_max : float or None, optional Maximum m/z value to consider for peak detection. If None, use full range. Returns ------- intensity_table : pd.DataFrame DataFrame with samples as rows and aligned m/z values as columns, containing peak intensities (amplitudes). Missing peaks are filled with 0. area_table : pd.DataFrame DataFrame with samples as rows and aligned m/z values as columns, containing peak areas. Missing peaks are filled with 0. peaks_df : pd.DataFrame DataFrame containing all detected peaks with their properties before alignment. """ all_peak_records = [] for file_path in csv_files: try: # Extract sample name and group from filename sample_name = os.path.basename(file_path).replace('.csv', '') group = _resolve_group( sample_name, self.group_patterns, self.group_fn ) if self.verbose: logger.info(f"Processing: {sample_name}") # Load CSV file df = pd.read_csv(file_path) # Check for required columns if 'mz' not in df.columns or 'intensity' not in df.columns: logger.warning(f"Skipping {file_path} - missing 'mz' or 'intensity' columns") continue mz = df['mz'].values intensity = df['intensity'].values # Apply m/z cutoff if specified if mz_min is not None or mz_max is not None: mask = np.ones(len(mz), dtype=bool) if mz_min is not None: mask &= (mz >= mz_min) if mz_max is not None: mask &= (mz <= mz_max) mz = mz[mask] intensity = intensity[mask] if len(mz) == 0: if self.verbose: logger.warning(f" No data in m/z range [{mz_min}, {mz_max}]") continue if self.verbose: logger.info(f" m/z range: [{mz.min():.4f}, {mz.max():.4f}]") # Detect peaks — dispatch on self.method if self.method is None: peak_properties = detect_peaks_with_area_v2( mz=mz, intens=intensity, sample_name=sample_name, group=group, min_intensity=self.min_intensity, min_snr=self.min_snr, min_distance=self.min_distance, prominence=self.prominence, min_peak_width=self.min_peak_width, max_peak_width=self.max_peak_width, rel_height=self.width_rel_height, noise_model=self.noise_model, noise_bins=self.noise_bins, noise_min_points=self.noise_min_points, verbose=self.verbose, ) elif self.method == 'cwt': peak_properties = detect_peaks_cwt_with_area( mz_values=mz, intensities=intensity, sample_name=sample_name, group=group, min_intensity=self.min_intensity, min_snr=self.min_snr, peak_height=self.peak_height, prominence=self.prominence, min_peak_width=self.min_peak_width, max_peak_width=self.max_peak_width, width_rel_height=self.width_rel_height, noise_model=self.noise_model, noise_bins=self.noise_bins, noise_min_points=self.noise_min_points, deconvolution_min_bic_delta=self.deconvolution_min_bic_delta, deconvolution_overlap_factor=self.deconvolution_overlap_factor, deconvolution_replace_singles=self.deconvolution_replace_singles, verbose=self.verbose, ) else: peak_properties = robust_peak_detection( mz_values=mz, intensities=intensity, sample_name=sample_name, group=group, method=self.method, min_intensity=self.min_intensity, min_snr=self.min_snr, min_distance=self.min_distance, peak_height=self.peak_height, prominence=self.prominence, min_peak_width=self.min_peak_width, max_peak_width=self.max_peak_width, width_rel_height=self.width_rel_height, noise_model=self.noise_model, noise_bins=self.noise_bins, noise_min_points=self.noise_min_points, deconvolution_min_bic_delta=self.deconvolution_min_bic_delta, deconvolution_overlap_factor=self.deconvolution_overlap_factor, deconvolution_replace_singles=self.deconvolution_replace_singles, verbose=self.verbose, ) if len(peak_properties) > 0: all_peak_records.append(peak_properties) if self.verbose: logger.info(f" Detected {len(peak_properties)} peaks") else: if self.verbose: logger.info(" No peaks detected") except Exception as e: logger.error(f"Error processing {file_path}: {e}", exc_info=self.verbose) continue # Combine all peak records if len(all_peak_records) == 0: logger.warning("No peaks detected in any file!") empty_table = pd.DataFrame() return empty_table, empty_table, _empty_peak_properties_df() peaks_df = pd.concat(all_peak_records, ignore_index=True) if self.verbose: logger.info(f"Total peaks detected: {len(peaks_df)}") logger.info(f"Unique samples: {peaks_df['SampleName'].nunique()}") # Align peaks and create intensity table intensity_table = align_peaks( peaks_df, mz_tolerance=self.mz_tolerance, mz_rounding_precision=self.mz_rounding_precision, output="intensity" ) # Align peaks and create area table area_table = align_peaks( peaks_df, mz_tolerance=self.mz_tolerance, mz_rounding_precision=self.mz_rounding_precision, output="area" ) if self.verbose: logger.info(f"Aligned m/z values: {intensity_table.shape[1]}") logger.info(f"Intensity table shape: {intensity_table.shape}") logger.info(f"Area table shape: {area_table.shape}") # Save output files if directory specified if self.output_dir is not None: os.makedirs(self.output_dir, exist_ok=True) # Save tables intensity_path = os.path.join(self.output_dir, 'peak_intensity_table.csv') area_path = os.path.join(self.output_dir, 'peak_area_table.csv') peaks_path = os.path.join(self.output_dir, 'all_detected_peaks.csv') intensity_table.to_csv(intensity_path) area_table.to_csv(area_path) peaks_df.to_csv(peaks_path, index=False) if self.verbose: logger.info(f"Saved outputs to {self.output_dir}:") logger.info(f" - {intensity_path}") logger.info(f" - {area_path}") logger.info(f" - {peaks_path}") return intensity_table, area_table, peaks_df