Source code for mioXpektron.recalibrate.peak_picker_uncal

"""
peak_picker.py
Identify the most intense point within ±0.5 Da of eight reference m/z
values in every calibrated ToF-SIMS spectrum and return the results
as a nested Python dictionary.

Author: Julhash Kazi, Lund University, 2025-06-15
"""
import logging
import os
from glob import glob
from pathlib import Path
from typing import Sequence, Dict, List, Optional

logger = logging.getLogger(__name__)

import numpy as np
import pandas as pd
from tqdm import tqdm

# --------------------------------------------------------------------
# 1) USER SETTINGS
# --------------------------------------------------------------------
TOL_DA_DEFAULT = 0.2          # e.g., ±0.2 Da (fix the comment!)
TOL_PPM_DEFAULT = None        # e.g., 150  (use PPM if not None)
TARGET_MASSES = np.array([
    39.0983,       # "
    41.0265        # "
], dtype=float)

# Optional: integer labels if you need nominal m/z columns as well.
TARGET_NOMINAL = np.rint(TARGET_MASSES).astype(int).tolist()

OUTPUT_DIR = Path("output_files")

# --------------------------------------------------------------------
# 2) CORE PICKER
# --------------------------------------------------------------------
def _ppm_to_da(mz: float, ppm: float) -> float:
    return mz * (ppm * 1e-6)

def _parabolic_vertex(x: np.ndarray, y: np.ndarray) -> Optional[float]:
    """
    Fit a quadratic y = ax^2 + bx + c through three points around the apex
    and return the x coordinate of the vertex (-b / (2a)) if stable.
    """
    if len(x) != 3 or len(y) != 3:
        return None
    # Guard against degenerate fits
    if not np.isfinite(y).all() or np.isclose(np.ptp(x), 0.0):
        return None
    # Fit quadratic in x
    coeffs = np.polyfit(x, y, 2)
    a, b, _ = coeffs
    if np.isclose(a, 0.0):
        return None
    xv = -b / (2.0 * a)
    # Only accept if inside the local span
    if xv < x.min() or xv > x.max():
        return None
    return float(xv)


def _local_peak_bounds(
    x: np.ndarray,
    y: np.ndarray,
    peak_idx: int,
    *,
    min_fraction: float = 0.25,
    min_points: int = 3,
) -> tuple[int, int]:
    """Return a contiguous local support region around the apex."""
    if len(x) == 0:
        return 0, -1

    left = peak_idx
    right = peak_idx
    peak_y = float(y[peak_idx])
    baseline = float(np.nanmin(y))
    rel_height = max(peak_y - baseline, 0.0)
    threshold = baseline + min_fraction * rel_height

    while left > 0 and y[left - 1] >= threshold:
        left -= 1
    while right < len(x) - 1 and y[right + 1] >= threshold:
        right += 1

    while (right - left + 1) < min_points:
        expanded = False
        if left > 0:
            left -= 1
            expanded = True
        if (right - left + 1) >= min_points:
            break
        if right < len(x) - 1:
            right += 1
            expanded = True
        if not expanded:
            break

    return left, right


def _centroid_peak_center(
    x: np.ndarray,
    y: np.ndarray,
    peak_idx: int,
    *,
    min_points: int = 3,
    subtract_baseline: bool = True,
    apex_fraction: Optional[float] = 0.75,
) -> Optional[float]:
    """Return a local centroid center with optional baseline-aware apex support."""
    if len(x) == 0:
        return None

    left, right = _local_peak_bounds(x, y, peak_idx, min_points=min_points)
    x_local = np.asarray(x[left:right + 1], dtype=np.float64)
    y_local = np.asarray(y[left:right + 1], dtype=np.float64)
    if len(x_local) == 0 or not np.isfinite(y_local).all():
        return None

    if subtract_baseline:
        baseline = float(np.nanmin(y))
        support_profile = np.clip(y_local - baseline, 0.0, None)
    else:
        support_profile = np.clip(y_local, 0.0, None)

    if not np.any(support_profile > 0):
        return None

    peak_local = int(peak_idx - left)
    if apex_fraction is not None and 0.0 < apex_fraction < 1.0:
        threshold = float(np.nanmax(support_profile)) * apex_fraction
        apex_left = peak_local
        apex_right = peak_local
        while apex_left > 0 and support_profile[apex_left - 1] >= threshold:
            apex_left -= 1
        while apex_right < len(support_profile) - 1 and support_profile[apex_right + 1] >= threshold:
            apex_right += 1

        while (apex_right - apex_left + 1) < min_points:
            expanded = False
            if apex_left > 0:
                apex_left -= 1
                expanded = True
            if (apex_right - apex_left + 1) >= min_points:
                break
            if apex_right < len(support_profile) - 1:
                apex_right += 1
                expanded = True
            if not expanded:
                break

        x_local = x_local[apex_left:apex_right+1]

    weights = np.clip(y_local, 0.0, None)
    if apex_fraction is not None and 0.0 < apex_fraction < 1.0:
        weights = weights[apex_left:apex_right+1]

    if not np.any(weights > 0):
        return None
    weight_sum = float(np.sum(weights))
    if weight_sum <= 0.0:
        return None
    return float(np.sum(x_local * weights) / weight_sum)


def _interpolate_channel_for_mz(
    mz_values: np.ndarray,
    channel_values: np.ndarray,
    refined_mz: float,
) -> Optional[float]:
    """Map a refined m/z center back to a floating-point channel position."""
    if not np.isfinite(refined_mz):
        return None

    order = np.argsort(mz_values)
    mz_sorted = np.asarray(mz_values, dtype=np.float64)[order]
    ch_sorted = np.asarray(channel_values, dtype=np.float64)[order]

    unique_mz, unique_idx = np.unique(mz_sorted, return_index=True)
    if len(unique_mz) < 2:
        return None

    unique_ch = ch_sorted[unique_idx]
    if refined_mz < unique_mz[0] or refined_mz > unique_mz[-1]:
        return None

    return float(np.interp(refined_mz, unique_mz, unique_ch))

[docs] def pick_peaks( df: pd.DataFrame, targets: Sequence[float] = TARGET_MASSES, tol_da: Optional[float] = TOL_DA_DEFAULT, tol_ppm: Optional[float] = TOL_PPM_DEFAULT, method: str = "centroid", # "max" | "centroid" | "centroid_raw" | "parabolic" min_points: int = 3, min_intensity: float = 0.0 ) -> List[Dict]: """ Select a representative (m/z, intensity, channel) near each target. - 'max': highest point within window - 'centroid': baseline-subtracted, apex-focused centroid within the local peak - 'centroid_raw': raw intensity-weighted centroid over the local peak support - 'parabolic': pick apex by 'max', then refine m/z by a local quadratic fit using apex±1 bins (falls back to 'max' if not feasible) """ # Use float64 for precision in calibration context mz = df["m/z"].astype("float64").to_numpy() I = df["Intensity"].astype("float64").to_numpy() ch = df["Channel"].to_numpy() # ensure finite finite = np.isfinite(mz) & np.isfinite(I) mz, I, ch = mz[finite], I[finite], ch[finite] out = [] for xi in targets: # decide window tol = _ppm_to_da(xi, tol_ppm) if tol_ppm is not None else float(tol_da) left, right = xi - tol, xi + tol mask = (mz >= left) & (mz <= right) & (I >= min_intensity) if not mask.any(): out.append({ "Target_m/z": float(xi), "Matched_m/z": np.nan, "Channel": np.nan, "Intensity": 0.0, "n_points": 0, "edge": False, "method_used": "none" }) continue idxs = np.flatnonzero(mask) mzw = mz[idxs] Iw = I[idxs] # Default pick: max k_local = int(np.nanargmax(Iw)) k_global = int(idxs[k_local]) if method == "max": mz_pick = mz[k_global] I_pick = I[k_global] ch_pick = ch[k_global] method_used = "max" elif method == "centroid": mz_c = _centroid_peak_center( mzw, Iw, k_local, min_points=3, subtract_baseline=True, apex_fraction=0.75, ) if mz_c is not None: ch_interp = _interpolate_channel_for_mz(mzw, ch[idxs], mz_c) if ch_interp is not None: mz_pick, I_pick, ch_pick = mz_c, float(I[k_global]), float(ch_interp) method_used = "centroid" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" elif method == "centroid_raw": mz_c = _centroid_peak_center( mzw, Iw, k_local, min_points=3, subtract_baseline=False, apex_fraction=None, ) if mz_c is not None: ch_interp = _interpolate_channel_for_mz(mzw, ch[idxs], mz_c) if ch_interp is not None: mz_pick, I_pick, ch_pick = mz_c, float(I[k_global]), float(ch_interp) method_used = "centroid_raw" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" elif method == "parabolic": # Fit around apex using apex±1 if available if len(mzw) >= 3 and 0 < k_local < len(mzw)-1: trip_x = mzw[k_local-1:k_local+2] trip_y = Iw[k_local-1:k_local+2] xv = _parabolic_vertex(trip_x, trip_y) if xv is not None and np.isfinite(xv): ch_interp = _interpolate_channel_for_mz(mzw, ch[idxs], xv) if ch_interp is not None: mz_pick, I_pick, ch_pick = float(xv), float(I[k_global]), float(ch_interp) method_used = "parabolic" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" else: mz_pick, I_pick, ch_pick = mz[k_global], I[k_global], float(ch[k_global]) method_used = "max_fallback" else: raise ValueError(f"Unknown method: {method}") edge = np.isclose(mz_pick, left) or np.isclose(mz_pick, right) out.append({ "Target_m/z": float(xi), "Matched_m/z": float(mz_pick), "Channel": float(ch_pick), "Intensity": float(I_pick), "n_points": int(mask.sum()), "edge": bool(edge), "method_used": method_used }) return out
# -------------------------------------------------------------------- # 3) BATCH PROCESSOR # --------------------------------------------------------------------
[docs] def batch_peak_process_uncal( files: Sequence[str], tol_da: Optional[float] = TOL_DA_DEFAULT, tol_ppm: Optional[float] = TOL_PPM_DEFAULT, method: str = "centroid" ): """ Process multiple spectra and write: - peak_summary.tsv (long format, per spectrum x per target) - channel_summary_exact.tsv (wide, columns = exact target masses) - channel_summary_nominal.tsv (wide, columns = nominal integer m/z) Returns: - calib_channels_dict_exact: {spectrum: [channels in TARGET_MASSES order]} - calib_channels_dict_nominal:{spectrum: [channels in TARGET_NOMINAL order]} """ OUTPUT_DIR.mkdir(parents=True, exist_ok=True) peak_dict: Dict[str, List[Dict]] = {} for path in tqdm(files, desc="Processing spectra"): try: df = pd.read_csv( path, sep="\t", header=0, comment="#", usecols=["Channel", "m/z", "Intensity"], dtype={"Channel": np.int32, "m/z": np.float64, "Intensity": np.float64} ) except ValueError as e: raise RuntimeError(f"{os.path.basename(path)} – bad format: {e}") peak_dict[os.path.basename(path)] = pick_peaks( df, targets=TARGET_MASSES, tol_da=tol_da, tol_ppm=tol_ppm, method=method ) # 4) Persist long table long_df = ( pd.concat({k: pd.DataFrame(v) for k, v in peak_dict.items()}) .reset_index(level=1, drop=True) .rename_axis("Spectrum") .reset_index() ) peak_summary_path = OUTPUT_DIR / "peak_summary_uncal.tsv" long_df.to_csv(peak_summary_path, sep="\t", index=False) logger.info(f"Saved → {peak_summary_path}") # 5) Wide tables (exact vs nominal) # exact masses as columns exact_map = { spec: {row["Target_m/z"]: row["Channel"] for row in rows} for spec, rows in peak_dict.items() } summary_exact = (pd.DataFrame.from_dict(exact_map, orient="index") .reindex(columns=list(TARGET_MASSES))) summary_exact.index.name = "Spectrum" summary_exact_csv = summary_exact.reset_index() summary_exact_path = OUTPUT_DIR / "channel_summary_exact_uncal.tsv" summary_exact_csv.to_csv(summary_exact_path, sep="\t", index=False) logger.info(f"Saved → {summary_exact_path}") # nominal integers as columns (optional) nominal_map = {} for spec, rows in peak_dict.items(): d = {} for t, n in zip(TARGET_MASSES, TARGET_NOMINAL): # find the row for exact target t r = next((r for r in rows if np.isclose(r["Target_m/z"], t)), None) d[n] = (None if r is None or pd.isna(r["Channel"]) else int(r["Channel"])) nominal_map[spec] = d summary_nominal = (pd.DataFrame.from_dict(nominal_map, orient="index") .reindex(columns=TARGET_NOMINAL)) summary_nominal.index.name = "Spectrum" summary_nominal_csv = summary_nominal.reset_index() summary_nominal_path = OUTPUT_DIR / "channel_summary_nominal_uncal.tsv" summary_nominal_csv.to_csv(summary_nominal_path, sep="\t", index=False) logger.info(f"Saved → {summary_nominal_path}") # 6) Dicts in memory (channel lists in fixed order) calib_channels_dict_exact = { spectrum: [ (None if pd.isna(ch) else int(ch)) for ch in summary_exact.loc[spectrum, list(TARGET_MASSES)] ] for spectrum in summary_exact.index } calib_channels_dict_nominal = { spectrum: [ (None if pd.isna(ch) else int(ch)) for ch in summary_nominal.loc[spectrum, TARGET_NOMINAL] ] for spectrum in summary_nominal.index } # Show sample logger.info("Dictionary sample (first 2):") for k in list(calib_channels_dict_exact)[:2]: logger.info("%s → exact: %s | nominal: %s", k, calib_channels_dict_exact[k], calib_channels_dict_nominal[k]) return calib_channels_dict_exact, calib_channels_dict_nominal
''' How to call files = glob("path/to/spectra/*.txt") calib_exact, calib_nominal = batch_peak_process( files, tol_da=0.2, # or use tol_ppm=150, tol_da=None tol_ppm=None, method="parabolic" # "max" | "centroid" | "parabolic" ) '''