Source code for mioXpektron.normalization.normalization_eval

"""
Normalization method evaluation for ToF-SIMS data.

Evaluates multiple normalization strategies on a set of labelled spectra
using unsupervised, supervised and spectral-quality metrics, then ranks
them with composite scores — following the approach established in
`xpectrass` for FTIR data but adapted to the specifics of ToF-SIMS
(Poisson counting statistics, high dynamic range, ion-yield variation).

Usage
-----
>>> from mioXpektron.normalization import NormalizationEvaluator
>>> evaluator = NormalizationEvaluator(files=["spectra/*.txt"])
>>> summary = evaluator.evaluate()
>>> evaluator.plot()
"""

from __future__ import annotations

import json
import logging
import time
import warnings
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)

try:
    import polars as pl
    _POLARS_AVAILABLE = True
except ImportError:
    pl = None  # type: ignore[assignment]
    _POLARS_AVAILABLE = False

try:
    import matplotlib.pyplot as plt
    _MPL_AVAILABLE = True
except ImportError:
    plt = None  # type: ignore[assignment]
    _MPL_AVAILABLE = False

from .normalization import normalize, normalization_method_names, tic_normalization
from ..utils.file_management import import_data

OUTPUT_DIR = Path("output_files")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


# ---------------------------------------------------------------------------
#  Helpers
# ---------------------------------------------------------------------------

[docs] def spectral_angle(a: np.ndarray, b: np.ndarray, eps: float = 1e-12) -> float: """Spectral Angle Mapper (SAM) in radians; lower => more similar shape.""" a = np.asarray(a, dtype=float) b = np.asarray(b, dtype=float) na = np.linalg.norm(a) + eps nb = np.linalg.norm(b) + eps cos = np.clip(np.dot(a, b) / (na * nb), -1.0, 1.0) return float(np.arccos(cos))
[docs] def within_group_mean_sam(X: np.ndarray, groups: np.ndarray) -> float: """Mean SAM across all pairs within each group (technical replicates).""" groups = np.asarray(groups) vals: List[float] = [] for g in np.unique(groups): idx = np.flatnonzero(groups == g) if idx.size < 2: continue for i in range(idx.size): for j in range(i + 1, idx.size): vals.append(spectral_angle(X[idx[i]], X[idx[j]])) return float(np.mean(vals)) if vals else np.nan
def _zscore_robust(series: pd.Series) -> pd.Series: """Z-score with epsilon to avoid division by zero.""" return (series - series.mean()) / (series.std(ddof=0) + 1e-12) def _cv_of_tic(X: np.ndarray) -> float: """Coefficient of variation of row-wise TIC — lower => better normalisation.""" tics = np.nansum(X, axis=1) if np.nanmean(tics) == 0: return np.nan return float(np.nanstd(tics) / np.nanmean(tics)) def _mean_pairwise_correlation(X: np.ndarray, groups: np.ndarray) -> float: """Mean within-group Pearson correlation — higher => better consistency.""" groups = np.asarray(groups) vals: List[float] = [] for g in np.unique(groups): idx = np.flatnonzero(groups == g) if idx.size < 2: continue for i in range(idx.size): for j in range(i + 1, idx.size): r = np.corrcoef(X[idx[i]], X[idx[j]])[0, 1] if np.isfinite(r): vals.append(r) return float(np.mean(vals)) if vals else np.nan def _dynamic_range(X: np.ndarray) -> float: """Median dynamic range (log10(max/min of positive values)) across samples.""" drs: List[float] = [] for row in X: pos = row[row > 0] if pos.size > 1: drs.append(np.log10(np.max(pos) / np.min(pos))) return float(np.median(drs)) if drs else np.nan def _negative_fraction(X: np.ndarray) -> float: """Fraction of values < 0 across the entire matrix.""" return float(np.sum(X < 0) / X.size) if X.size > 0 else 0.0 def _has_glob_chars(s: str) -> bool: return any(ch in s for ch in "*?[") # --------------------------------------------------------------------------- # Core single-method evaluation # ---------------------------------------------------------------------------
[docs] def evaluate_one_method( X_raw: np.ndarray, groups: np.ndarray, mz_values: np.ndarray, method: str, method_kwargs: Optional[Dict[str, Any]] = None, n_clusters: Optional[int] = None, cluster_bootstrap_rounds: int = 30, cluster_bootstrap_frac: float = 0.8, random_state: int = 0, compute_supervised: bool = True, ) -> Dict[str, Any]: """Evaluate a single normalisation method on the spectra matrix. Parameters ---------- X_raw : np.ndarray (n_samples, n_channels) raw intensity matrix. groups : np.ndarray (n_samples,) label per sample. mz_values : np.ndarray (n_channels,) m/z axis shared by all spectra. method : str Normalization method name. method_kwargs : dict, optional Extra keyword arguments forwarded to :func:`normalize`. n_clusters : int, optional Number of clusters. Defaults to number of unique groups. cluster_bootstrap_rounds : int Bootstrap rounds for cluster stability. cluster_bootstrap_frac : float Fraction of samples per bootstrap round. random_state : int RNG seed. compute_supervised : bool If True and scikit-learn is available, run supervised CV. Returns ------- dict Keys include ``method``, all metric values, and ``compute_time_sec``. """ method_kwargs = method_kwargs or {} n_samples, n_channels = X_raw.shape n_clusters_actual = n_clusters or int(np.unique(groups).size) # ---------- normalise ---------- t0 = time.perf_counter() # Methods that need a dataset-level reference ref_needed = method in ("pqn", "median_of_ratios", "pareto", "mass_stratified_pqn") if ref_needed: if method == "pqn": reference = np.nanmedian(X_raw, axis=0) method_kwargs = {**method_kwargs, "reference": reference} elif method == "median_of_ratios": # Geometric mean across samples (add pseudo-count for zeros) log_means = np.nanmean(np.log(X_raw + 1.0), axis=0) reference = np.exp(log_means) - 1.0 method_kwargs = {**method_kwargs, "reference": reference} elif method == "pareto": mean = np.nanmean(X_raw, axis=0) std = np.nanstd(X_raw, axis=0, ddof=0) method_kwargs = {**method_kwargs, "mean": mean, "std": std} elif method == "mass_stratified_pqn": reference = np.nanmedian(X_raw, axis=0) method_kwargs = { **method_kwargs, "reference": reference, "mz_values": mz_values, } X_norm = np.empty_like(X_raw, dtype=float) for i in range(n_samples): X_norm[i] = normalize(X_raw[i], method=method, **method_kwargs) compute_time = time.perf_counter() - t0 # ---------- unsupervised metrics ---------- cv_tic = _cv_of_tic(X_norm) within_sam = within_group_mean_sam(X_norm, groups) within_corr = _mean_pairwise_correlation(X_norm, groups) dyn_range = _dynamic_range(X_norm) neg_frac = _negative_fraction(X_norm) # Clustering evaluation ari_km = nmi_km = cluster_stability = sil = np.nan try: from sklearn.cluster import KMeans from sklearn.metrics import ( adjusted_rand_score, normalized_mutual_info_score, silhouette_score, ) km = KMeans(n_clusters=n_clusters_actual, n_init="auto", random_state=random_state) lab_ref = km.fit_predict(X_norm) ari_km = adjusted_rand_score(groups, lab_ref) nmi_km = normalized_mutual_info_score(groups, lab_ref) if n_samples > n_clusters_actual: try: sil = float(silhouette_score(X_norm, lab_ref, metric="cosine")) except Exception: pass # Stability rng = np.random.default_rng(random_state) stab: List[float] = [] m = min(n_samples, max(2, int(round(cluster_bootstrap_frac * n_samples)))) for _ in range(cluster_bootstrap_rounds): idx = rng.choice(n_samples, size=m, replace=False) km_sub = KMeans(n_clusters=n_clusters_actual, n_init="auto", random_state=int(rng.integers(0, 2**31 - 1))) lab_sub = km_sub.fit_predict(X_norm[idx]) stab.append(adjusted_rand_score(lab_ref[idx], lab_sub)) cluster_stability = float(np.mean(stab)) except ImportError: logger.debug("scikit-learn not available; skipping clustering metrics.") # ---------- supervised metrics ---------- sup_f1 = sup_bal = np.nan if compute_supervised: try: from sklearn.linear_model import LogisticRegression from sklearn.model_selection import StratifiedKFold from sklearn.metrics import f1_score, balanced_accuracy_score unique_classes = np.unique(groups) if len(unique_classes) >= 2: n_splits = min(5, min(Counter(groups).values())) if n_splits >= 2: skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) f1s, bals = [], [] for tr, te in skf.split(X_norm, groups): clf = LogisticRegression(max_iter=2000, n_jobs=None) with warnings.catch_warnings(): warnings.simplefilter("ignore") clf.fit(X_norm[tr], groups[tr]) pred = clf.predict(X_norm[te]) f1s.append(f1_score(groups[te], pred, average="macro", zero_division=0)) bals.append(balanced_accuracy_score(groups[te], pred)) sup_f1 = float(np.mean(f1s)) sup_bal = float(np.mean(bals)) except ImportError: logger.debug("scikit-learn not available; skipping supervised metrics.") return { "method": method, "cv_tic": cv_tic, "within_group_SAM": within_sam, "within_group_corr": within_corr, "dynamic_range": dyn_range, "negative_fraction": neg_frac, "cluster_ARI": float(ari_km), "cluster_NMI": float(nmi_km), "cluster_stability": float(cluster_stability), "silhouette_cosine": float(sil), "supervised_macro_f1": sup_f1, "supervised_bal_acc": sup_bal, "compute_time_sec": compute_time, }
# --------------------------------------------------------------------------- # Evaluator class (mirrors BaselineMethodEvaluator pattern) # ---------------------------------------------------------------------------
[docs] @dataclass class NormalizationEvaluator: """Evaluate normalization methods on labelled ToF-SIMS spectra. Parameters ---------- files : list of str or Path Paths or glob patterns expanding to spectrum text files. methods : list of str, optional Normalization method names. Defaults to a sensible subset. method_kwargs_map : dict, optional ``{method_name: {kwarg: value, ...}}`` for method-specific params. mz_min, mz_max : float, optional m/z range to import. n_clusters : int, optional Number of clusters for KMeans evaluation. Auto-detected if omitted. cluster_bootstrap_rounds : int Bootstrap rounds for stability metric. random_state : int RNG seed for reproducibility. compute_supervised : bool Run supervised classification (requires scikit-learn + >=2 groups). n_jobs : int Parallel workers (joblib). ``-1`` = all CPUs, ``1`` = sequential. Examples -------- >>> evaluator = NormalizationEvaluator(files=["data/*.txt"]) >>> summary = evaluator.evaluate() >>> evaluator.plot() """ files: List[Union[str, Path]] = field(default_factory=list) methods: Optional[List[str]] = None method_kwargs_map: Optional[Dict[str, Dict[str, Any]]] = None mz_min: Optional[float] = None mz_max: Optional[float] = None n_clusters: Optional[int] = None cluster_bootstrap_rounds: int = 30 cluster_bootstrap_frac: float = 0.8 random_state: int = 0 compute_supervised: bool = True n_jobs: int = -1 group_patterns: Optional[Dict[str, str]] = None group_fn: Optional[Any] = None # Callable[[str], str] # internal state _resolved_files: List[Path] = field(default_factory=list, init=False, repr=False) _X_raw: Optional[np.ndarray] = field(default=None, init=False, repr=False) _groups: Optional[np.ndarray] = field(default=None, init=False, repr=False) _mz: Optional[np.ndarray] = field(default=None, init=False, repr=False) _results: Optional[pd.DataFrame] = field(default=None, init=False, repr=False) def __post_init__(self): self._resolved_files = self._expand_files(self.files) if not self._resolved_files: raise FileNotFoundError( "No input files found; verify the provided file paths or " "glob patterns." ) if self.methods is None: # Sensible defaults for ToF-SIMS self.methods = [ "tic", "median", "rms", "max", "vector", "poisson", "sqrt", "log", "vsn", "minmax", ] self.method_kwargs_map = self.method_kwargs_map or {} self.files = self._resolved_files # type: ignore[assignment] # -- file resolution --------------------------------------------------- @staticmethod def _expand_files(candidates) -> List[Path]: paths: List[Path] = [] for item in candidates: s = str(item).strip() if not s: continue if _has_glob_chars(s): for hit in sorted(Path().glob(s)): if hit.is_file(): paths.append(hit.resolve()) continue p = Path(s) if p.is_file(): paths.append(p.resolve()) elif p.is_dir(): for pattern in ("*.txt", "*.csv"): for hit in sorted(p.rglob(pattern)): if hit.is_file(): paths.append(hit.resolve()) return sorted(set(paths)) # -- data loading ------------------------------------------------------ def _load_spectra(self): """Read all spectrum files into a (samples x channels) matrix.""" spectra: List[Tuple[str, np.ndarray]] = [] groups: List[str] = [] ref_mz: Optional[np.ndarray] = None for fp in self._resolved_files: try: mz, intensity, sample_name, group = import_data( str(fp), self.mz_min, self.mz_max, group_patterns=self.group_patterns, group_fn=self.group_fn, ) if ref_mz is None: ref_mz = mz elif len(mz) != len(ref_mz): logger.warning( "Skipping %s: channel count %d != %d", fp.name, len(mz), len(ref_mz), ) continue elif not np.allclose(mz, ref_mz, atol=1e-6): logger.warning( "Skipping %s: m/z grid mismatch (max delta %.4e)", fp.name, float(np.max(np.abs(mz - ref_mz))), ) continue spectra.append((sample_name, intensity)) groups.append(group) except Exception as e: logger.error("Error loading %s: %s", fp.name, e) if not spectra: raise RuntimeError("No spectra could be loaded.") self._X_raw = np.vstack([s[1] for s in spectra]) self._groups = np.array(groups) self._mz = ref_mz self._sample_names = [s[0] for s in spectra] logger.info( "Loaded %d spectra (%d channels) with groups: %s", len(spectra), ref_mz.shape[0], dict(Counter(groups)), ) # -- evaluation --------------------------------------------------------
[docs] def evaluate(self) -> pd.DataFrame: """Evaluate all methods and return a scored DataFrame. Returns ------- pd.DataFrame One row per method, sorted by ``score_combined`` (descending). Includes raw metrics, z-scored metrics, and four composite scores. """ if self._X_raw is None: self._load_spectra() methods = self.methods or normalization_method_names() rows: List[Dict[str, Any]] = [] try: from joblib import Parallel, delayed def _eval(m): return evaluate_one_method( self._X_raw, self._groups, self._mz, method=m, method_kwargs=self.method_kwargs_map.get(m, {}), n_clusters=self.n_clusters, cluster_bootstrap_rounds=self.cluster_bootstrap_rounds, cluster_bootstrap_frac=self.cluster_bootstrap_frac, random_state=self.random_state, compute_supervised=self.compute_supervised, ) if self.n_jobs == 1: try: from tqdm import tqdm it = tqdm(methods, desc="Evaluating normalization") except ImportError: it = methods rows = [_eval(m) for m in it] else: try: from tqdm import tqdm rows = Parallel(n_jobs=self.n_jobs, backend="loky")( delayed(_eval)(m) for m in tqdm(methods, desc="Evaluating normalization") ) except ImportError: rows = Parallel(n_jobs=self.n_jobs, backend="loky")( delayed(_eval)(m) for m in methods ) except ImportError: # No joblib — sequential fallback for m in methods: rows.append(evaluate_one_method( self._X_raw, self._groups, self._mz, method=m, method_kwargs=self.method_kwargs_map.get(m, {}), n_clusters=self.n_clusters, cluster_bootstrap_rounds=self.cluster_bootstrap_rounds, cluster_bootstrap_frac=self.cluster_bootstrap_frac, random_state=self.random_state, compute_supervised=self.compute_supervised, )) res = pd.DataFrame(rows) res = self._compute_scores(res) self._results = res return res
@staticmethod def _compute_scores(res: pd.DataFrame) -> pd.DataFrame: """Add z-scored metrics and composite scores to the results table. Scoring follows the xpectrass convention: all z-scores are oriented so that *higher = better*, then combined into weighted composites. """ # --- Z-scores (higher = better for all) --- # Lower is better → negate before z-scoring res["z_cv_tic"] = _zscore_robust(-res["cv_tic"]) res["z_sam"] = _zscore_robust(-res["within_group_SAM"]) res["z_neg_frac"] = _zscore_robust(-res["negative_fraction"]) res["z_compute_time"] = _zscore_robust(-res["compute_time_sec"]) # Higher is better res["z_corr"] = _zscore_robust(res["within_group_corr"]) res["z_dyn_range"] = _zscore_robust(res["dynamic_range"]) res["z_ari"] = _zscore_robust(res["cluster_ARI"]) res["z_nmi"] = _zscore_robust(res["cluster_NMI"]) res["z_stability"] = _zscore_robust(res["cluster_stability"]) res["z_sil"] = _zscore_robust(res["silhouette_cosine"]) res["z_f1"] = _zscore_robust(res["supervised_macro_f1"]) res["z_bal_acc"] = _zscore_robust(res["supervised_bal_acc"]) # --- Composite scores --- # Check whether supervised metrics are available (not all NaN) has_supervised = ( res["supervised_macro_f1"].notna().any() and res["supervised_bal_acc"].notna().any() ) # Replace NaN z-scores with 0 so they don't poison weighted sums def _z(col: str) -> pd.Series: return res[col].fillna(0.0) if has_supervised: # 1. Combined (balanced) # 40% spectral quality, 30% clustering, 20% supervised, 10% practical res["score_combined"] = ( 0.15 * _z("z_cv_tic") + 0.10 * _z("z_sam") + 0.10 * _z("z_corr") + 0.05 * _z("z_dyn_range") + 0.10 * _z("z_ari") + 0.10 * _z("z_nmi") + 0.10 * _z("z_stability") + 0.10 * _z("z_f1") + 0.10 * _z("z_bal_acc") + 0.05 * _z("z_neg_frac") + 0.05 * _z("z_sil") ) else: # Redistribute supervised weight (20%) to unsupervised components logger.info( "Supervised metrics unavailable; using unsupervised-only " "weights for score_combined." ) res["score_combined"] = ( 0.20 * _z("z_cv_tic") + 0.15 * _z("z_sam") + 0.15 * _z("z_corr") + 0.10 * _z("z_dyn_range") + 0.15 * _z("z_ari") + 0.10 * _z("z_nmi") + 0.10 * _z("z_stability") + 0.05 * _z("z_neg_frac") ) # 2. Unsupervised (no labels needed — quality-focused) res["score_unsupervised"] = ( 0.25 * _z("z_cv_tic") + 0.20 * _z("z_sam") + 0.20 * _z("z_corr") + 0.15 * _z("z_stability") + 0.10 * _z("z_sil") + 0.10 * _z("z_dyn_range") ) # 3. Supervised-focused (NaN if supervised metrics are absent) if has_supervised: res["score_supervised"] = ( 0.25 * _z("z_f1") + 0.25 * _z("z_bal_acc") + 0.15 * _z("z_ari") + 0.15 * _z("z_nmi") + 0.10 * _z("z_stability") + 0.10 * _z("z_cv_tic") ) else: res["score_supervised"] = np.nan # 4. Efficiency-aware res["score_efficient"] = ( 0.85 * res["score_combined"] + 0.15 * _z("z_compute_time") ) return res.sort_values("score_combined", ascending=False).reset_index(drop=True) # -- plotting ----------------------------------------------------------
[docs] def plot( self, out_dir: Union[str, Path] = "normalization_selection_output", save: bool = True, ) -> List[Path]: """Generate evaluation plots (box plots, bar charts, radar). Parameters ---------- out_dir : str or Path Sub-folder inside ``OUTPUT_DIR`` for saved figures. save : bool Persist plots as PNG + PDF. Returns ------- list of Path Saved file paths. """ if self._results is None: raise RuntimeError("Call evaluate() before plotting.") if not _MPL_AVAILABLE: raise ImportError("matplotlib is required for plotting.") res = self._results out = OUTPUT_DIR / Path(out_dir) out.mkdir(parents=True, exist_ok=True) saved: List[Path] = [] self._pub_style() # --- 1. Composite score comparison --- fig, ax = plt.subplots(figsize=(10, 5)) methods = res["method"].tolist() x = np.arange(len(methods)) width = 0.2 ax.bar(x - 1.5 * width, res["score_combined"], width, label="Combined") ax.bar(x - 0.5 * width, res["score_unsupervised"], width, label="Unsupervised") ax.bar(x + 0.5 * width, res["score_supervised"], width, label="Supervised") ax.bar(x + 1.5 * width, res["score_efficient"], width, label="Efficient") ax.set_xticks(x) ax.set_xticklabels(methods, rotation=45, ha="right") ax.set_ylabel("Composite score (higher = better)") ax.set_title("Normalization Method Comparison") ax.legend(frameon=False) ax.axhline(0, color="grey", lw=0.5, ls="--") fig.tight_layout() saved.extend(self._save_fig(fig, out, "composite_scores", save)) # --- 2. Metric box/bar plots --- metric_info = [ ("cv_tic", "CV of TIC (lower = better)", True), ("within_group_SAM", "Within-group SAM (lower = better)", True), ("within_group_corr", "Within-group correlation (higher = better)", False), ("cluster_ARI", "Adjusted Rand Index (higher = better)", False), ("cluster_stability", "Cluster stability (higher = better)", False), ("negative_fraction", "Negative fraction (lower = better)", True), ] for col, title, lower_better in metric_info: if col not in res.columns: continue fig, ax = plt.subplots(figsize=(9, 4.5)) vals = res[col].values x = np.arange(len(methods)) colors = ["#2ca02c" if not lower_better else "#d62728"] * len(vals) best_idx = int(np.nanargmin(vals) if lower_better else np.nanargmax(vals)) colors[best_idx] = "#1f77b4" ax.bar(x, vals, color=colors) ax.set_title(title) ax.set_ylabel(col) ax.set_xticks(x) ax.set_xticklabels(methods, rotation=45, ha="right") fig.tight_layout() saved.extend(self._save_fig(fig, out, col, save)) # --- 3. Overall ranking bar --- fig, ax = plt.subplots(figsize=(9, 4.5)) ax.barh(methods[::-1], res["score_combined"].values[::-1]) ax.set_xlabel("Combined score (higher = better)") ax.set_title("Overall Normalization Ranking") fig.tight_layout() saved.extend(self._save_fig(fig, out, "overall_ranking", save)) # --- 4. Export CSVs --- res.to_csv(out / "normalization_eval.csv", index=False) summary = { "best_method": res.iloc[0]["method"], "n_spectra": int(self._X_raw.shape[0]) if self._X_raw is not None else 0, "n_channels": int(self._X_raw.shape[1]) if self._X_raw is not None else 0, "methods_evaluated": methods, "scores": { row["method"]: round(row["score_combined"], 4) for _, row in res.iterrows() }, } with open(out / "summary.json", "w") as f: json.dump(summary, f, indent=2) logger.info("Saved %d files to %s", len(saved) + 2, out) return saved
[docs] def print_summary(self, top_n: int = 5) -> None: """Print a ranked summary of evaluation results. Parameters ---------- top_n : int, default 5 Number of top methods to display per score variant. """ if self._results is None: raise RuntimeError("Call evaluate() before print_summary().") res = self._results logger.info("=" * 80) logger.info("NORMALIZATION EVALUATION SUMMARY (Top %d Methods)", top_n) logger.info("=" * 80) logger.info("") logger.info("--- COMBINED SCORE (Recommended) ---") logger.info( "Weights: 40%% spectral quality, 30%% clustering, " "20%% supervised, 10%% practical" ) for i, row in res.head(top_n).iterrows(): logger.info( " %d. %-20s | Score: %6.3f | CV-TIC: %.3f | SAM: %.4f | " "ARI: %.3f | F1: %.3f", i + 1, row["method"], row["score_combined"], row["cv_tic"], row["within_group_SAM"], row["cluster_ARI"], row["supervised_macro_f1"], ) logger.info("") logger.info("--- UNSUPERVISED SCORE ---") for i, row in res.nlargest(top_n, "score_unsupervised").iterrows(): logger.info( " %d. %-20s | Score: %6.3f", i + 1, row["method"], row["score_unsupervised"], ) logger.info("") logger.info("--- SUPERVISED SCORE ---") for i, row in res.nlargest(top_n, "score_supervised").iterrows(): logger.info( " %d. %-20s | Score: %6.3f | F1: %.3f | Balanced Acc: %.3f", i + 1, row["method"], row["score_supervised"], row["supervised_macro_f1"], row["supervised_bal_acc"], ) logger.info("") logger.info("--- EFFICIENCY-AWARE SCORE ---") for i, row in res.nlargest(top_n, "score_efficient").iterrows(): logger.info( " %d. %-20s | Score: %6.3f | Time: %.4fs", i + 1, row["method"], row["score_efficient"], row["compute_time_sec"], ) logger.info("=" * 80)
[docs] def preview_overlay( self, file: Union[str, Path], methods: Optional[List[str]] = None, max_methods: int = 5, mz_min: Optional[float] = None, mz_max: Optional[float] = None, save_to: Optional[Union[str, Path]] = "normalization_selection_output", ) -> None: """Plot raw vs normalised overlays for quick visual comparison. Parameters ---------- file : str or Path Single spectrum file to visualise. methods : list of str, optional Methods to overlay. Defaults to top methods from evaluation. max_methods : int Cap on the number of overlays. mz_min, mz_max : float, optional m/z window for the plot. save_to : str, Path, or None Save directory (relative to OUTPUT_DIR). ``None`` skips saving. """ if not _MPL_AVAILABLE: raise ImportError("matplotlib is required for plotting.") mz, intensity, name, group = import_data( str(file), mz_min or self.mz_min, mz_max or self.mz_max, group_patterns=self.group_patterns, group_fn=self.group_fn, ) if methods is None: if self._results is not None: methods = self._results["method"].tolist()[:max_methods] else: methods = (self.methods or normalization_method_names())[:max_methods] self._pub_style() fig, axes = plt.subplots(len(methods) + 1, 1, figsize=(12, 2.5 * (len(methods) + 1)), sharex=True) axes[0].plot(mz, intensity, lw=0.5, color="grey") axes[0].set_title(f"Raw: {name}") axes[0].set_ylabel("Intensity") for i, m in enumerate(methods[:max_methods]): kwargs = self.method_kwargs_map.get(m, {}) if self.method_kwargs_map else {} def _project_dataset_vector(values: np.ndarray) -> np.ndarray: values = np.asarray(values, dtype=float) if self._mz is None or values.shape != np.asarray(self._mz).shape: return values if len(self._mz) == len(mz) and np.allclose(self._mz, mz, atol=1e-6): return values return np.interp(mz, self._mz, values, left=0.0, right=0.0) if "reference_idx" in kwargs and self._mz is not None: ref_idx = int(kwargs["reference_idx"]) if 0 <= ref_idx < len(self._mz): ref_mz = float(self._mz[ref_idx]) kwargs = { **kwargs, "reference_idx": int(np.argmin(np.abs(mz - ref_mz))), } if "reference_indices" in kwargs and self._mz is not None: ref_indices = np.asarray(kwargs["reference_indices"], dtype=int) mapped = [] for ref_idx in ref_indices: if 0 <= ref_idx < len(self._mz): ref_mz = float(self._mz[ref_idx]) mapped.append(int(np.argmin(np.abs(mz - ref_mz)))) kwargs = { **kwargs, "reference_indices": mapped, } # Build reference for dataset-level methods if m in ("pqn", "median_of_ratios") and self._X_raw is not None: if m == "pqn": kwargs = { **kwargs, "reference": _project_dataset_vector(np.nanmedian(self._X_raw, axis=0)), } elif m == "median_of_ratios": log_means = np.nanmean(np.log(self._X_raw + 1.0), axis=0) kwargs = { **kwargs, "reference": _project_dataset_vector(np.exp(log_means) - 1.0), } elif m == "pareto" and self._X_raw is not None: kwargs = { **kwargs, "mean": _project_dataset_vector(np.nanmean(self._X_raw, axis=0)), "std": _project_dataset_vector(np.nanstd(self._X_raw, axis=0, ddof=0)), } elif m == "mass_stratified_pqn" and self._X_raw is not None: kwargs = { **kwargs, "reference": _project_dataset_vector(np.nanmedian(self._X_raw, axis=0)), "mz_values": mz, } try: norm = normalize(intensity, method=m, **kwargs) axes[i + 1].plot(mz, norm, lw=0.5) axes[i + 1].set_title(m) axes[i + 1].set_ylabel("Norm. Int.") except Exception as e: axes[i + 1].set_title(f"{m} (failed: {e})") axes[-1].set_xlabel("m/z") fig.tight_layout() if save_to: save_dir = OUTPUT_DIR / Path(save_to) save_dir.mkdir(parents=True, exist_ok=True) for ext in (".png", ".pdf"): fig.savefig(save_dir / f"preview_{name}{ext}", bbox_inches="tight", dpi=300) plt.show()
# -- helpers ----------------------------------------------------------- @staticmethod def _pub_style(): if not _MPL_AVAILABLE: return plt.rcParams.update({ "figure.dpi": 120, "savefig.dpi": 300, "font.size": 10, "axes.labelsize": 10, "axes.titlesize": 11, "xtick.labelsize": 9, "ytick.labelsize": 9, "legend.fontsize": 9, "axes.grid": True, "grid.alpha": 0.25, }) @staticmethod def _save_fig(fig, out_dir: Path, stem: str, save: bool) -> List[Path]: saved = [] if save: for ext in (".png", ".pdf"): p = out_dir / f"{stem}{ext}" fig.savefig(p, bbox_inches="tight") saved.append(p) plt.close(fig) return saved