"""
Debug version of FlexibleCalibrator with diagnostic logging.
Inherits all functionality from ``flexible_calibrator`` and overrides only the
peak-detection helpers that benefit from verbose diagnostics. This eliminates
code duplication while preserving the diagnostic output needed during
development and validation.
Version: 0.0.1
"""
import logging
from typing import Dict, List, Optional
import numpy as np
import numpy.typing as npt
import pandas as pd
# Re-export everything from the production modules so that existing imports
# keep working.
from .flexible_calibrator import ( # noqa: F401 – re-exports
FlexibleCalibConfig,
FlexibleCalibrator,
CalibrationMethod,
)
from ._models import (
_ppm_to_da,
_fit_gaussian_peak,
_fit_voigt_peak,
_parabolic_peak_center as _parabolic_peak_center_base,
_enhanced_pick_channels as _enhanced_pick_channels_base,
_enhanced_bootstrap_channels,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Diagnostic overrides
# ---------------------------------------------------------------------------
def _parabolic_peak_center(
x: npt.NDArray[np.float64],
y: npt.NDArray[np.float64],
peak_idx: int,
) -> Optional[float]:
"""Parabolic interpolation with debug logging at each decision point."""
if peak_idx == 0 or peak_idx == len(x) - 1:
logger.debug(
"Parabolic fit failed: peak at edge (peak_idx=%d, len=%d)",
peak_idx, len(x),
)
return None
x1, x2, x3 = x[peak_idx - 1], x[peak_idx], x[peak_idx + 1]
y1, y2, y3 = y[peak_idx - 1], y[peak_idx], y[peak_idx + 1]
denom = (x1 - x2) * (x1 - x3) * (x2 - x3)
if abs(denom) < 1e-10:
logger.debug(
"Parabolic fit failed: denominator too small (%.2e)", abs(denom),
)
return None
A = (x3 * (y2 - y1) + x2 * (y1 - y3) + x1 * (y3 - y2)) / denom
B = (x3**2 * (y1 - y2) + x2**2 * (y3 - y1) + x1**2 * (y2 - y3)) / denom
if abs(A) < 1e-10:
logger.debug(
"Parabolic fit failed: A coefficient too small (%.2e)", abs(A),
)
return None
xc = -B / (2 * A)
if xc < x1 or xc > x3:
logger.debug(
"Parabolic fit failed: center out of range "
"(xc=%.4f, range=[%.4f, %.4f])",
xc, x1, x3,
)
return None
logger.debug(
"Parabolic fit success: peak_mz=%.4f -> refined_mz=%.4f (shift=%.4f)",
x2, xc, xc - x2,
)
return float(xc)
def _enhanced_pick_channels(
df: pd.DataFrame,
targets: npt.NDArray[np.float64],
tol_da: Optional[float],
tol_ppm: Optional[float],
method: str = "gaussian",
) -> List[int]:
"""Enhanced peak picking with diagnostic counters and logging."""
mz = df["m/z"].astype("float64").to_numpy()
I = df["Intensity"].astype("float64").to_numpy()
ch = df["Channel"].to_numpy()
out: List[int] = []
# Diagnostic counters
parabolic_success = 0
parabolic_fallback = 0
parabolic_same_as_max = 0
parabolic_different_from_max = 0
logger.info("[DIAGNOSTIC] Peak picking method: %s", method)
for target_idx, xi in enumerate(targets):
if tol_ppm is not None:
tol = _ppm_to_da(xi, tol_ppm)
elif tol_da is not None:
from ._models import _ppm_to_da as _p2d
max_tol = _p2d(xi, 500.0)
tol = max(0.05, min(tol_da, max_tol))
else:
tol = _ppm_to_da(xi, 200.0)
left, right = xi - tol, xi + tol
mask = (mz >= left) & (mz <= right)
if not mask.any():
logger.debug(
"[DIAGNOSTIC] Target %.4f: No peaks found in window", xi,
)
out.append(np.nan)
continue
idxs = np.flatnonzero(mask)
mzw = mz[idxs]
Iw = I[idxs]
chw = ch[idxs]
k_local = int(np.nanargmax(Iw))
peak_mz = float(mzw[k_local])
max_ch = chw[k_local]
logger.debug(
"[DIAGNOSTIC] Target %.4f: Found %d points in window, "
"max at mz=%.4f, ch=%s",
xi, len(mzw), peak_mz, max_ch,
)
if method == "max":
final_ch = chw[k_local]
logger.debug(
"[DIAGNOSTIC] Target %.4f: Using MAX method -> ch=%s",
xi, final_ch,
)
elif method == "centroid":
wsum = Iw.sum()
if wsum > 0:
mz_c = float((mzw * Iw).sum() / wsum)
nearest = int(np.argmin(np.abs(mzw - mz_c)))
final_ch = chw[nearest]
logger.debug(
"[DIAGNOSTIC] Target %.4f: CENTROID mz=%.4f -> ch=%s "
"(vs max ch=%s)",
xi, mz_c, final_ch, max_ch,
)
else:
final_ch = chw[k_local]
logger.debug(
"[DIAGNOSTIC] Target %.4f: CENTROID fallback to MAX "
"-> ch=%s",
xi, final_ch,
)
elif method == "parabolic":
center = _parabolic_peak_center(mzw, Iw, k_local)
if center is not None:
parabolic_success += 1
nearest = int(np.argmin(np.abs(mzw - center)))
final_ch = chw[nearest]
if final_ch == max_ch:
parabolic_same_as_max += 1
else:
parabolic_different_from_max += 1
logger.info(
"[DIAGNOSTIC] Target %.4f: PARABOLIC refined_mz=%.4f "
"-> ch=%s (max_ch=%s, SAME=%s)",
xi, center, final_ch, max_ch, final_ch == max_ch,
)
else:
parabolic_fallback += 1
final_ch = chw[k_local]
logger.info(
"[DIAGNOSTIC] Target %.4f: PARABOLIC FALLBACK to MAX "
"-> ch=%s",
xi, final_ch,
)
elif method == "gaussian":
center = _fit_gaussian_peak(mzw, Iw, peak_mz)
if center is not None:
nearest = int(np.argmin(np.abs(mzw - center)))
final_ch = chw[nearest]
else:
wsum = Iw.sum()
if wsum > 0:
mz_c = float((mzw * Iw).sum() / wsum)
nearest = int(np.argmin(np.abs(mzw - mz_c)))
final_ch = chw[nearest]
else:
final_ch = chw[k_local]
elif method == "voigt":
center = _fit_voigt_peak(mzw, Iw, peak_mz)
if center is not None:
nearest = int(np.argmin(np.abs(mzw - center)))
final_ch = chw[nearest]
else:
center = _fit_gaussian_peak(mzw, Iw, peak_mz)
if center is not None:
nearest = int(np.argmin(np.abs(mzw - center)))
final_ch = chw[nearest]
else:
final_ch = chw[k_local]
else:
final_ch = chw[k_local]
out.append(int(final_ch))
# Print summary statistics for parabolic method
if method == "parabolic":
total_targets = len(targets)
logger.info("[DIAGNOSTIC SUMMARY] " + "=" * 42)
logger.info(
"[DIAGNOSTIC SUMMARY] Total targets: %d", total_targets,
)
logger.info(
"[DIAGNOSTIC SUMMARY] Parabolic fit succeeded: %d/%d (%.1f%%)",
parabolic_success, total_targets,
100 * parabolic_success / max(1, total_targets),
)
logger.info(
"[DIAGNOSTIC SUMMARY] Parabolic fit failed (fallback to max): "
"%d/%d (%.1f%%)",
parabolic_fallback, total_targets,
100 * parabolic_fallback / max(1, total_targets),
)
if parabolic_success > 0:
logger.info("[DIAGNOSTIC SUMMARY] When parabolic succeeded:")
logger.info(
"[DIAGNOSTIC SUMMARY] - Same channel as max: %d/%d (%.1f%%)",
parabolic_same_as_max, parabolic_success,
100 * parabolic_same_as_max / max(1, parabolic_success),
)
logger.info(
"[DIAGNOSTIC SUMMARY] - Different channel from max: "
"%d/%d (%.1f%%)",
parabolic_different_from_max, parabolic_success,
100 * parabolic_different_from_max / max(1, parabolic_success),
)
logger.info("[DIAGNOSTIC SUMMARY] " + "=" * 42)
return out
# ---------------------------------------------------------------------------
# FlexibleCalibratorDebug – thin subclass that wires in the diagnostic helpers
# ---------------------------------------------------------------------------
[docs]
class FlexibleCalibratorDebug(FlexibleCalibrator):
"""Debug variant of :class:`FlexibleCalibrator` with verbose peak-picking.
Inherits all calibration logic and overrides only ``_autodetect_channels``
to route through the diagnostic versions of ``_enhanced_pick_channels``
and ``_parabolic_peak_center``.
"""
def _autodetect_channels(
self,
files,
ref_masses: npt.NDArray[np.float64],
) -> Dict[str, list]:
"""Autodetect calibrant channels using diagnostic peak picking."""
import os
from tqdm import tqdm
autodetected: Dict[str, list] = {}
for fp in tqdm(files, desc="Autodetecting channels (debug)"):
try:
df = pd.read_csv(fp, sep="\t", header=0, comment="#")
except Exception as e:
logger.warning(
"%s: Failed to read - %s", os.path.basename(fp), e,
)
continue
fname = os.path.basename(fp)
if "Channel" not in df.columns:
continue
if (
self.config.prefer_recompute_from_channel
or self.config.autodetect_strategy == "bootstrap"
or "m/z" not in df.columns
):
ch = df["Channel"].to_numpy()
y = df["Intensity"].astype("float64").to_numpy()
autodetected[fname] = _enhanced_bootstrap_channels(
ch, y, ref_masses,
)
else:
# Use the diagnostic version defined in this module
autodetected[fname] = _enhanced_pick_channels(
df,
ref_masses,
self.config.autodetect_tol_da,
self.config.autodetect_tol_ppm,
self.config.autodetect_method,
)
if not autodetected:
raise RuntimeError("Autodetection failed")
logger.info(
"Autodetected channels for %d/%d files",
len(autodetected), len(files),
)
return autodetected