Source code for mioXpektron.recalibrate.auto_calibrator

"""
AutoCalibrator — automatic multi-model calibration for mass spectrometry.

Fits all requested calibration models, picks the best one per file, and
applies the winning model.  Model fitting, inversion, and peak-detection
live in the shared ``_models`` backend.

Author: Data Analysis Team @KaziLab.se
Version: 0.0.1
"""

import os
import logging
import warnings
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Sequence, Tuple

import numpy as np
import numpy.typing as npt
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm

# --- shared backend ---
from ._models import (
    MODEL_REGISTRY,
    _MODEL_ALIASES,
    _ModelMeta,
    _ppm_error,
    _ppm_to_da,
    _detect_outliers_huber,
    _estimate_noise_level,
    _fit_gaussian_peak,
    _fit_voigt_peak,
    _parabolic_peak_center,
    _enhanced_pick_channels,
    _enhanced_bootstrap_channels,
    _robust_initial_params_quad_sqrt,
    _fit_quad_sqrt_robust,
    _fit_reflectron,
    _fit_multisegment,
    _fit_spline_model,
    _fit_linear_sqrt,
    _fit_poly2,
    _invert_quad_sqrt,
    _invert_reflectron,
    _invert_linear_sqrt,
    _invert_poly2,
    _invert_spline,
    _invert_physical,
    apply_model_to_spectrum,
)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)

warnings.filterwarnings('ignore', category=RuntimeWarning)


[docs] class CalibrationModel(Enum): """Enumeration of available calibration models.""" QUAD_SQRT = "quad_sqrt" LINEAR_SQRT = "linear_sqrt" POLY2 = "poly2" REFLECTRON = "reflectron" MULTISEGMENT = "multisegment" SPLINE = "spline" PHYSICAL = "physical"
[docs] class PeakDetectionMethod(Enum): """Enumeration of peak detection methods.""" MAX = "max" CENTROID = "centroid" CENTROID_RAW = "centroid_raw" PARABOLIC = "parabolic" GAUSSIAN = "gaussian" VOIGT = "voigt"
# --------------------------------------------------------------------------- # Configuration # ---------------------------------------------------------------------------
[docs] @dataclass class AutoCalibConfig: """Universal calibration configuration with robust options. Parameters ---------- reference_masses : list of float Known calibrant ion masses (m/z). model : str, optional Convenience shortcut — a single model name (or common alias like ``'quadratic'``, ``'tof'``, ``'linear'``). Resolved into *models_to_try* during ``__post_init__``. Ignored when *models_to_try* is explicitly provided. models_to_try : list of str, optional Explicit list of model names to fit. Default: all production-ready models (excludes experimental ones such as ``multisegment`` and ``physical``). """ reference_masses: List[float] output_folder: str = "calibrated_spectra" max_workers: Optional[int] = None # Peak detection parameters autodetect_tol_da: Optional[float] = None autodetect_tol_ppm: Optional[float] = None autodetect_method: str = "gaussian" autodetect_fallback_policy: str = "max" autodetect_strategy: str = "mz" prefer_recompute_from_channel: bool = False # Robust fitting parameters outlier_threshold: float = 3.0 use_outlier_rejection: bool = True max_iterations: int = 3 # Model selection parameters model: Optional[str] = None # convenience alias -> models_to_try models_to_try: Optional[List[str]] = None prefer_physical_models: bool = True # Quality control min_calibrants: int = 3 max_ppm_warning: float = 100.0 max_ppm_error: float = 500.0 # Advanced parameters use_bootstrap_init: bool = True spline_smoothing: Optional[float] = None multisegment_breakpoints: List[float] = field(default_factory=lambda: [50, 200, 500]) # Instrument parameters (for physical model) instrument_params: Dict[str, float] = field(default_factory=dict) def __post_init__(self): if self.autodetect_fallback_policy not in {"max", "nan", "raise"}: raise ValueError( "autodetect_fallback_policy must be 'max', 'nan', or 'raise'" ) # Resolve the convenience ``model`` alias if self.models_to_try is None: if self.model is not None: canonical = _MODEL_ALIASES.get(self.model.lower(), self.model.lower()) if canonical not in MODEL_REGISTRY: raise ValueError( f"Unknown model '{self.model}'. " f"Valid names: {sorted(MODEL_REGISTRY)}; " f"aliases: {sorted(_MODEL_ALIASES)}" ) self.models_to_try = [canonical] else: # Default: all production-ready models (selection_allowed=True) self.models_to_try = [ name for name, meta in MODEL_REGISTRY.items() if meta.selection_allowed ] # Warn if any requested model is experimental for name in self.models_to_try: meta = MODEL_REGISTRY.get(name) if meta and meta.experimental: logger.warning( "Model '%s' is experimental: %s", name, meta.description )
# --------------------------------------------------------------------------- # Fit-and-score: tries all requested models on one file # --------------------------------------------------------------------------- def _fit_and_score_models_enhanced( filename: str, ref_masses: npt.NDArray[np.float64], calib_channels_dict: Dict[str, Sequence[float]], config: AutoCalibConfig, ) -> Optional[Tuple[str, Dict]]: """Fit all requested models and pick the best one for *filename*.""" if filename not in calib_channels_dict: logger.warning(f"{filename}: Not found in calibration channels dict") return None t_meas = np.asarray(calib_channels_dict[filename], dtype=float) m_ref = np.asarray(ref_masses, dtype=float) valid = np.isfinite(t_meas) n_valid = valid.sum() if n_valid < config.min_calibrants: logger.warning(f"{filename}: Only {n_valid} valid calibrants (need >={config.min_calibrants})") return None t_meas = t_meas[valid] m_ref = m_ref[valid] logger.info(f"{filename}: Fitting with {n_valid} calibrants") out: Dict[str, Dict] = {} for model_name in config.models_to_try: try: if model_name == "quad_sqrt": params = _fit_quad_sqrt_robust( m_ref, t_meas, config.outlier_threshold, config.max_iterations, ) if params: m_est = _invert_quad_sqrt(t_meas, *params) ppm = _ppm_error(m_ref, m_est) out[model_name] = {"params": params, "ppm": ppm, "n_calibrants": n_valid} elif model_name == "reflectron": params = _fit_reflectron(m_ref, t_meas) if params: m_est = _invert_reflectron(t_meas, *params) ppm = _ppm_error(m_ref, m_est) out[model_name] = {"params": params, "ppm": ppm, "n_calibrants": n_valid} elif model_name == "multisegment": segment_data = _fit_multisegment(m_ref, t_meas, config.multisegment_breakpoints) if segment_data: # Evaluate multisegment ppm by applying each segment to its calibrants m_est_ms = np.full_like(m_ref, np.nan) for seg_info in segment_data["segments"].values(): low, high = seg_info["range"] seg_mask = (m_ref >= low) & (m_ref < high) if seg_mask.any(): m_est_ms[seg_mask] = _invert_quad_sqrt( t_meas[seg_mask], *seg_info["params"] ) ppm = _ppm_error(m_ref, m_est_ms) out[model_name] = {"params": segment_data, "ppm": ppm, "n_calibrants": n_valid} elif model_name == "spline": spline = _fit_spline_model(m_ref, t_meas, config.spline_smoothing) if spline: m_est = _invert_spline(t_meas, spline) ppm = _ppm_error(m_ref, m_est) out[model_name] = {"params": spline, "ppm": ppm, "n_calibrants": n_valid} elif model_name == "linear_sqrt": params = _fit_linear_sqrt(m_ref, t_meas) if params: m_est = _invert_linear_sqrt(t_meas, *params) ppm = _ppm_error(m_ref, m_est) out[model_name] = {"params": params, "ppm": ppm, "n_calibrants": n_valid} elif model_name == "poly2": params = _fit_poly2(m_ref, t_meas) if params: m_est = _invert_poly2(t_meas, *params) ppm = _ppm_error(m_ref, m_est) out[model_name] = {"params": params, "ppm": ppm, "n_calibrants": n_valid} else: logger.debug(f"{filename}: No handler for model '{model_name}'; skipping") except Exception as e: logger.debug(f"{filename}: Failed to fit {model_name}: {e}") continue if not out: logger.warning(f"{filename}: All models failed to fit") return None # Select best model — only among models whose metadata allows auto-selection. selectable = { k: v for k, v in out.items() if MODEL_REGISTRY.get(k, _ModelMeta(k, True, False, True, "")).selection_allowed } if not selectable: selectable = out if config.prefer_physical_models: physical_models = ["quad_sqrt", "reflectron"] for model in physical_models: if model in selectable and selectable[model]["ppm"] < config.max_ppm_warning: best_name = model break else: best_name = min(selectable.keys(), key=lambda k: selectable[k]["ppm"]) else: best_name = min(selectable.keys(), key=lambda k: selectable[k]["ppm"]) out["best_model"] = best_name best_ppm = out[best_name]["ppm"] if best_ppm > config.max_ppm_error: logger.error(f"{filename}: Best model {best_name} exceeds max PPM ({best_ppm:.1f} > {config.max_ppm_error})") return None elif best_ppm > config.max_ppm_warning: logger.warning(f"{filename}: High PPM error ({best_ppm:.1f})") logger.info(f"{filename}: Best model={best_name}, ppm={best_ppm:.1f}") return filename, out # --------------------------------------------------------------------------- # Apply calibration to a single file # --------------------------------------------------------------------------- def _apply_model_to_file_enhanced(args: Tuple[str, Dict[str, Dict], str]) -> Optional[str]: """Apply the best calibration model to a file.""" file_path, best_map, out_folder = args fname = os.path.basename(file_path) if fname not in best_map: return None rec = best_map[fname] model = rec["best_model"] params = rec[model]["params"] try: df = pd.read_csv(file_path, sep="\t", header=0, comment="#") except Exception as e: logger.error(f"Failed to read {fname}: {e}") return None if "Channel" not in df.columns: logger.error(f"{fname}: Missing 'Channel' column") return None t = df["Channel"].astype(np.float64).to_numpy() mz_cal = apply_model_to_spectrum(t, model, params) out_df = pd.DataFrame({ "m/z": mz_cal, "Intensity": df["Intensity"].to_numpy(), "Channel": t }) os.makedirs(out_folder, exist_ok=True) out_fp = os.path.join(out_folder, fname.replace(".txt", "_calibrated.txt")) with open(out_fp, 'w') as f: f.write(f"# Calibration model: {model}\n") f.write(f"# Calibration error: {rec[model]['ppm']:.1f} ppm\n") f.write(f"# Number of calibrants: {rec[model]['n_calibrants']}\n") out_df.to_csv(f, sep="\t", index=False) return fname # --------------------------------------------------------------------------- # Main calibrator class # ---------------------------------------------------------------------------
[docs] class AutoCalibrator: """Automatic multi-model calibrator. Fits all requested models, selects the best one per file, and writes calibrated spectra. """ def __init__(self, config: Optional[AutoCalibConfig] = None): default_masses = [ 1.00782503224, # H+ 22.9897692820, # Na+ 38.9637064864, # K+ 58.065674, # Organic fragment 86.096974, # Organic fragment 104.107539, # Organic fragment 184.073871, # Organic fragment 224.105171 # Organic fragment ] self.config = config or AutoCalibConfig(reference_masses=default_masses) self.last_autodetect_methods: Dict[str, List[str]] = {} if len(self.config.reference_masses) < 3: raise ValueError("At least 3 reference masses required") logger.info(f"AutoCalibrator initialized with {len(self.config.models_to_try)} models")
[docs] def calibrate( self, files: Sequence[str], calib_channels_dict: Optional[Dict[str, Sequence[float]]] = None, ) -> pd.DataFrame: """Calibrate all files with automatic model selection.""" os.makedirs(self.config.output_folder, exist_ok=True) ref_masses = np.asarray(self.config.reference_masses, dtype=float) if calib_channels_dict is None: logger.info("Autodetecting calibrant channels...") calib_channels_dict = self._autodetect_channels(files, ref_masses) best_records: Dict[str, Dict] = {} max_workers = self.config.max_workers or os.cpu_count() or 4 with ProcessPoolExecutor(max_workers=max_workers) as ex: futs = { ex.submit( _fit_and_score_models_enhanced, os.path.basename(fp), ref_masses, calib_channels_dict, self.config, ): fp for fp in files } for fut in tqdm(as_completed(futs), total=len(futs), desc="Fitting models"): res = fut.result() if res is not None: fname, rec = res best_records[fname] = rec if not best_records: raise RuntimeError("No models could be fitted for any files") # Create summary rows = [] for fname, rec in best_records.items(): row = { "file_name": fname, "best_model": rec["best_model"], "best_ppm": rec[rec["best_model"]]["ppm"], "n_calibrants": rec[rec["best_model"]]["n_calibrants"], } for model in self.config.models_to_try: if model in rec: row[f"{model}_ppm"] = rec[model]["ppm"] rows.append(row) summary = pd.DataFrame(rows).sort_values("file_name") summary.to_csv( os.path.join(self.config.output_folder, "calibration_summary.tsv"), sep="\t", index=False, ) # Apply calibration with ProcessPoolExecutor(max_workers=max_workers) as ex: args = ((fp, best_records, self.config.output_folder) for fp in files) for _ in tqdm(ex.map(_apply_model_to_file_enhanced, args), total=len(files), desc="Applying calibration"): pass mean_ppm = summary["best_ppm"].mean() median_ppm = summary["best_ppm"].median() logger.info(f"Calibration complete: Mean PPM = {mean_ppm:.1f}, Median PPM = {median_ppm:.1f}") return summary
def _autodetect_channels( self, files: Sequence[str], ref_masses: npt.NDArray[np.float64], ) -> Dict[str, Sequence[float]]: """Autodetect calibrant channels from files.""" autodetected: Dict[str, Sequence[float]] = {} self.last_autodetect_methods = {} for fp in tqdm(files, desc="Autodetecting channels"): try: df = pd.read_csv(fp, sep="\t", header=0, comment="#") except Exception as e: logger.warning(f"Failed to read {os.path.basename(fp)}: {e}") continue fname = os.path.basename(fp) if "Channel" not in df.columns: logger.warning(f"{fname}: No Channel column") continue if (self.config.prefer_recompute_from_channel or self.config.autodetect_strategy == "bootstrap" or "m/z" not in df.columns): ch = df["Channel"].to_numpy() y = df["Intensity"].astype("float64").to_numpy() autodetected[fname] = _enhanced_bootstrap_channels(ch, y, ref_masses) self.last_autodetect_methods[fname] = ["bootstrap"] * len(ref_masses) else: channels, methods_used = _enhanced_pick_channels( df, ref_masses, self.config.autodetect_tol_da, self.config.autodetect_tol_ppm, self.config.autodetect_method, fallback_policy=self.config.autodetect_fallback_policy, return_details=True, ) autodetected[fname] = channels self.last_autodetect_methods[fname] = methods_used method_counts = pd.Series(methods_used).value_counts() non_requested = method_counts.drop( labels=[self.config.autodetect_method, "none"], errors="ignore", ) if not non_requested.empty: logger.info( "%s: autodetect requested '%s' with fallback_policy='%s'; actual methods=%s", fname, self.config.autodetect_method, self.config.autodetect_fallback_policy, ", ".join( f"{method_name} x{count}" for method_name, count in non_requested.items() ), ) if not autodetected: raise RuntimeError("Autodetection failed: no suitable files") logger.info(f"Autodetected channels for {len(autodetected)}/{len(files)} files") return autodetected
# --------------------------------------------------------------------------- # Convenience functions # ---------------------------------------------------------------------------
[docs] def quick_calibrate( files: Sequence[str], reference_masses: Optional[List[float]] = None, output_folder: str = "calibrated_spectra", models: Optional[List[str]] = None, **kwargs, ) -> pd.DataFrame: """Quick calibration with sensible defaults.""" if reference_masses is None: reference_masses = [1.00782503224, 22.9897692820, 38.9637064864] if models is None: models = ["quad_sqrt", "reflectron", "linear_sqrt", "poly2"] config = AutoCalibConfig( reference_masses=reference_masses, output_folder=output_folder, models_to_try=models, **kwargs, ) calibrator = AutoCalibrator(config) return calibrator.calibrate(files)
[docs] def diagnose_calibration( files: Sequence[str], reference_masses: List[float], output_folder: str = "calibration_diagnostics", calib_channels_dict: Optional[Dict[str, Sequence[float]]] = None, ) -> Dict[str, pd.DataFrame]: """Run comprehensive calibration diagnostics. Tests multiple models and peak detection methods to find optimal settings. """ results = {} os.makedirs(output_folder, exist_ok=True) all_models = ["quad_sqrt", "reflectron", "linear_sqrt", "poly2", "spline"] model_results = [] logger.info("=" * 60) logger.info("CALIBRATION DIAGNOSTICS") logger.info("=" * 60) for models_to_test in [ ["quad_sqrt"], ["reflectron"], ["linear_sqrt"], ["poly2"], ["spline"], ["quad_sqrt", "reflectron"], all_models, ]: logger.info(f"Testing models: {', '.join(models_to_test)}") config = AutoCalibConfig( reference_masses=reference_masses, output_folder=os.path.join(output_folder, "_".join(models_to_test)), models_to_try=models_to_test, use_outlier_rejection=True, ) try: calibrator = AutoCalibrator(config) summary = calibrator.calibrate(files[:min(5, len(files))], calib_channels_dict) model_results.append({ "models": ", ".join(models_to_test), "mean_ppm": summary["best_ppm"].mean(), "median_ppm": summary["best_ppm"].median(), "max_ppm": summary["best_ppm"].max(), "files_processed": len(summary), }) logger.info(f" Mean PPM: {summary['best_ppm'].mean():.1f}") except Exception as e: logger.error(f" Failed: {e}") model_results.append({ "models": ", ".join(models_to_test), "mean_ppm": np.nan, "median_ppm": np.nan, "max_ppm": np.nan, "files_processed": 0, }) results['model_comparison'] = pd.DataFrame(model_results) # Test different peak detection methods peak_methods = ["max", "centroid", "parabolic", "gaussian", "voigt"] peak_results = [] logger.info("-" * 60) logger.info("Testing peak detection methods...") logger.info("-" * 60) for method in peak_methods: logger.info(f"Testing {method} peak detection") config = AutoCalibConfig( reference_masses=reference_masses, output_folder=os.path.join(output_folder, f"peak_{method}"), models_to_try=["quad_sqrt"], autodetect_method=method, ) try: calibrator = AutoCalibrator(config) summary = calibrator.calibrate(files[:min(5, len(files))]) peak_results.append({ "method": method, "mean_ppm": summary["best_ppm"].mean(), "median_ppm": summary["best_ppm"].median(), "files_processed": len(summary), }) logger.info(f" Mean PPM: {summary['best_ppm'].mean():.1f}") except Exception as e: logger.error(f" Failed: {e}") peak_results.append({ "method": method, "mean_ppm": np.nan, "median_ppm": np.nan, "files_processed": 0, }) results['peak_method_comparison'] = pd.DataFrame(peak_results) for name, df in results.items(): df.to_csv(os.path.join(output_folder, f"{name}.tsv"), sep="\t", index=False) logger.info("=" * 60) logger.info("RECOMMENDATIONS") logger.info("=" * 60) if not results['model_comparison'].empty: best_model_idx = results['model_comparison']['mean_ppm'].idxmin() if not pd.isna(best_model_idx): best_models = results['model_comparison'].loc[best_model_idx, 'models'] best_ppm = results['model_comparison'].loc[best_model_idx, 'mean_ppm'] logger.info("Best model configuration: %s", best_models) logger.info(" Mean PPM error: %.1f", best_ppm) if not results['peak_method_comparison'].empty: best_peak_idx = results['peak_method_comparison']['mean_ppm'].idxmin() if not pd.isna(best_peak_idx): best_peak = results['peak_method_comparison'].loc[best_peak_idx, 'method'] best_peak_ppm = results['peak_method_comparison'].loc[best_peak_idx, 'mean_ppm'] logger.info("Best peak detection method: %s", best_peak) logger.info(" Mean PPM error: %.1f", best_peak_ppm) logger.info("Diagnostic results saved to: %s", output_folder) logger.info("=" * 60) return results
[docs] def validate_calibration( original_files: Sequence[str], calibrated_files: Sequence[str], known_masses: Optional[List[float]] = None, ) -> pd.DataFrame: """Validate calibration quality by checking known masses.""" if known_masses is None: known_masses = [ 1.0072764666, # H+ 15.0229265168, # CH3+ 22.9892207021, # Na+ 27.0229265168, # C2H3+ 29.0385765812, # C2H5+ 38.9631579065, # K+ 41.0385765812, # C3H5+ 43.0542266457, # C3H7+ 57.0698767102, # C4H9+ 58.065674, # C3H8N+ 67.0548, # C5H7+ 71.0855267746, # C5H11+ 86.096976, # C5H12N+ 91.0542266457, # C7H7+ (tropylium) 104.107539, # C5H14NO+ 184.073320, # C5H15NO4P+ (phosphocholine) 224.105171, # C8H19NO4P+ 369.351600, # C27H45+ (cholesterol) ] validation_results = [] for orig_file, cal_file in zip(original_files, calibrated_files): try: cal_df = pd.read_csv(cal_file, sep="\t", comment="#") mz = cal_df["m/z"].to_numpy() intensity = cal_df["Intensity"].to_numpy() for known_mz in known_masses: mask = np.abs(mz - known_mz) < 0.5 if mask.any(): idx_max = np.argmax(intensity[mask]) found_mz = mz[mask][idx_max] found_int = intensity[mask][idx_max] error_ppm = (found_mz - known_mz) / known_mz * 1e6 validation_results.append({ "file": os.path.basename(cal_file), "known_mz": known_mz, "found_mz": found_mz, "intensity": found_int, "error_ppm": error_ppm, }) except Exception as e: logger.warning(f"Failed to validate {cal_file}: {e}") if not validation_results: return pd.DataFrame() df = pd.DataFrame(validation_results) logger.info("=" * 60) logger.info("CALIBRATION VALIDATION SUMMARY") logger.info("=" * 60) for known_mz in known_masses: mz_data = df[df["known_mz"] == known_mz]["error_ppm"] if not mz_data.empty: mean_error = mz_data.mean() std_error = mz_data.std() logger.info("m/z %7.3f: %+6.1f +/- %5.1f ppm", known_mz, mean_error, std_error) overall_mean = df["error_ppm"].abs().mean() overall_median = df["error_ppm"].abs().median() logger.info("Overall mean absolute error: %.1f ppm", overall_mean) logger.info("Overall median absolute error: %.1f ppm", overall_median) logger.info("=" * 60) return df
[docs] def batch_process_directory( input_dir: str, output_dir: str = "calibrated_output", pattern: str = "*.txt", reference_masses: Optional[List[float]] = None, config: Optional[AutoCalibConfig] = None, recursive: bool = False, ) -> pd.DataFrame: """Process all matching files in a directory.""" import glob if recursive: files = glob.glob(os.path.join(input_dir, "**", pattern), recursive=True) else: files = glob.glob(os.path.join(input_dir, pattern)) if not files: raise ValueError(f"No files matching '{pattern}' found in {input_dir}") logger.info("Found %d files to process", len(files)) if config is None: if reference_masses is None: reference_masses = [1.00782503224, 22.9897692820, 38.9637064864] config = AutoCalibConfig( reference_masses=reference_masses, output_folder=output_dir, ) calibrator = AutoCalibrator(config) return calibrator.calibrate(files)
[docs] def create_calibration_report( summary_df: pd.DataFrame, output_file: str = "calibration_report.html", ) -> None: """Create an HTML report from calibration results.""" html_content = """ <!DOCTYPE html> <html> <head> <title>Calibration Report</title> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} h1 {{ color: #333; }} h2 {{ color: #666; border-bottom: 2px solid #ddd; padding-bottom: 5px; }} table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #f2f2f2; }} tr:nth-child(even) {{ background-color: #f9f9f9; }} .good {{ color: green; font-weight: bold; }} .warning {{ color: orange; font-weight: bold; }} .bad {{ color: red; font-weight: bold; }} .summary {{ background-color: #e8f4f8; padding: 15px; border-radius: 5px; margin: 20px 0; }} </style> </head> <body> <h1>Mass Spectrometry Calibration Report</h1> <div class="summary"> <h2>Summary Statistics</h2> <p><strong>Files Processed:</strong> {n_files}</p> <p><strong>Mean PPM Error:</strong> <span class="{mean_class}">{mean_ppm:.1f}</span></p> <p><strong>Median PPM Error:</strong> <span class="{median_class}">{median_ppm:.1f}</span></p> <p><strong>Max PPM Error:</strong> <span class="{max_class}">{max_ppm:.1f}</span></p> </div> <h2>Model Usage</h2> <table> <tr> <th>Model</th> <th>Files</th> <th>Percentage</th> </tr> {model_rows} </table> <h2>File Details</h2> <table> <tr> <th>File</th> <th>Best Model</th> <th>PPM Error</th> <th>Calibrants</th> </tr> {file_rows} </table> <p><small>Report generated: {timestamp}</small></p> </body> </html> """ mean_ppm = summary_df["best_ppm"].mean() median_ppm = summary_df["best_ppm"].median() max_ppm = summary_df["best_ppm"].max() def get_class(value): if value < 50: return "good" elif value < 100: return "warning" else: return "bad" model_counts = summary_df["best_model"].value_counts() model_rows = "" for model, count in model_counts.items(): pct = count / len(summary_df) * 100 model_rows += f"<tr><td>{model}</td><td>{count}</td><td>{pct:.1f}%</td></tr>\n" file_rows = "" for _, row in summary_df.iterrows(): ppm_class = get_class(row["best_ppm"]) file_rows += ( f'<tr><td>{row["file_name"]}</td><td>{row["best_model"]}</td>' f'<td class="{ppm_class}">{row["best_ppm"]:.1f}</td>' f'<td>{row["n_calibrants"]}</td></tr>\n' ) from datetime import datetime html = html_content.format( n_files=len(summary_df), mean_ppm=mean_ppm, mean_class=get_class(mean_ppm), median_ppm=median_ppm, median_class=get_class(median_ppm), max_ppm=max_ppm, max_class=get_class(max_ppm), model_rows=model_rows, file_rows=file_rows, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) with open(output_file, 'w') as f: f.write(html) logger.info("Report saved to: %s", output_file)
# --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Enhanced Universal Mass Calibrator") parser.add_argument("files", nargs="+", help="Input files to calibrate") parser.add_argument("-o", "--output", default="calibrated_spectra", help="Output directory") parser.add_argument("-r", "--reference", nargs="+", type=float, help="Reference masses for calibration") parser.add_argument("-m", "--models", nargs="+", choices=["quad_sqrt", "reflectron", "linear_sqrt", "poly2", "spline"], help="Models to try") parser.add_argument("--diagnose", action="store_true", help="Run diagnostic mode") parser.add_argument("--report", action="store_true", help="Generate HTML report") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) if args.diagnose: ref_masses = args.reference or [1.00782503224, 22.9897692820, 38.9637064864] diagnostics = diagnose_calibration(args.files, ref_masses) else: config = AutoCalibConfig( reference_masses=args.reference or [1.00782503224, 22.9897692820, 38.9637064864], output_folder=args.output, models_to_try=args.models or ["quad_sqrt", "reflectron", "linear_sqrt", "poly2"], ) calibrator = AutoCalibrator(config) summary = calibrator.calibrate(args.files) logger.info("Calibration complete!") logger.info("Mean PPM error: %.1f", summary['best_ppm'].mean()) logger.info("Files processed: %d", len(summary))