"""Batch-oriented helpers for running denoising over many spectra files."""
from __future__ import annotations
import logging
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
import traceback, time, os
from functools import partial
from typing import Iterable, List, Dict, Any, Callable, Optional, Union
from .denoise_main import noise_filtering
import numpy as np
logger = logging.getLogger(__name__)
[docs]
def load_txt_spectrum(path: Path) -> Dict[str, np.ndarray]:
"""Parse a plain-text spectrum file into NumPy arrays using Polars.
Parameters
----------
path : Path
Location of the ASCII export produced by the instrument or pre-processing
software. Delimiters can be comma, tab, semicolon, or space.
Returns
-------
dict
Dictionary containing any detected columns (`channel`, `mz`, and
`intensity`). Only columns that successfully parse are included; the
intensity array is always provided.
Notes
-----
The loader uses Polars for fast CSV parsing with automatic delimiter detection,
tolerates blank lines, and skips rows that fail numeric conversion.
"""
import polars as pl
# Try different delimiters
for sep in ["\t", ",", ";", " "]:
try:
df = pl.read_csv(
path,
separator=sep,
has_header=True,
ignore_errors=True,
truncate_ragged_lines=True,
comment_prefix="#",
)
if df.shape[0] > 0 and df.shape[1] >= 1:
break
except Exception:
continue
else:
# Fallback: try without header
try:
df = pl.read_csv(
path,
separator="\t",
has_header=False,
ignore_errors=True,
truncate_ragged_lines=True,
comment_prefix="#",
)
except Exception:
return {"intensity": np.array([], dtype=float)}
# Normalize column names for matching
def norm(s: str) -> str:
"""Lowercase tokens and drop spaces for loose matching."""
return s.strip().lower().replace(" ", "")
col_rename = {}
for col in df.columns:
normed = norm(col)
if normed in {"m/z", "mz", "mass", "M/Z", "MZ", "M"}:
col_rename[col] = "mz"
elif normed in {"intensity", "inten", "i", "Intensity"}:
col_rename[col] = "intensity"
elif normed in {"channel", "chan", "ch", "Channel"}:
col_rename[col] = "channel"
if col_rename:
df = df.rename(col_rename)
else:
# No header detected, assume column order: Channel, m/z, Intensity
if df.shape[1] >= 3:
df = df.rename({df.columns[0]: "channel", df.columns[1]: "mz", df.columns[2]: "intensity"})
elif df.shape[1] == 2:
df = df.rename({df.columns[0]: "mz", df.columns[1]: "intensity"})
elif df.shape[1] == 1:
df = df.rename({df.columns[0]: "intensity"})
# Build output dictionary with numeric arrays, keeping rows aligned
present_cols = [c for c in ["channel", "mz", "intensity"] if c in df.columns]
arrays = {}
for col_name in present_cols:
try:
arrays[col_name] = (
df.select(pl.col(col_name).cast(pl.Float64, strict=False))
.to_numpy()
.ravel()
)
except Exception:
pass
if not arrays:
return {"intensity": np.array([], dtype=float)}
# Build a joint finite-mask so all columns stay row-aligned
n = min(a.size for a in arrays.values())
mask = np.ones(n, dtype=bool)
for a in arrays.values():
mask &= np.isfinite(a[:n])
out = {k: v[:n][mask] for k, v in arrays.items()}
# Ensure intensity is always present
if "intensity" not in out or out["intensity"].size == 0:
out["intensity"] = np.array([], dtype=float)
return out
[docs]
def save_txt_spectrum(orig: Path, out_path: Path, arrays: Dict[str, np.ndarray]) -> None:
"""Persist a spectrum to disk with the same column ordering as the input using Polars.
Parameters
----------
orig : Path
Original source file. Currently unused but kept for potential metadata
handling.
out_path : Path
Destination path for the denoised export.
arrays : dict[str, np.ndarray]
Columns to write. The function writes whichever of `channel`, `mz`, and
`intensity` are present, preserving numeric precision to six decimals.
"""
import polars as pl
out_path.parent.mkdir(parents=True, exist_ok=True)
# Build dataframe from arrays
data_dict = {}
if "channel" in arrays:
data_dict["Channel"] = arrays["channel"]
if "mz" in arrays:
data_dict["m/z"] = arrays["mz"]
data_dict["Intensity"] = arrays["intensity"]
df = pl.DataFrame(data_dict)
# Write to CSV with tab separator and 6 decimal precision
df.write_csv(
out_path,
separator="\t",
)
[docs]
@dataclass
class BatchResult:
"""Outcome metadata for a single denoising file run."""
file: str
out_file: Optional[str]
status: str # "ok" or "error"
elapsed_s: float
n_points: int
message: str = ""
def _process_one(path: Path, out_dir: Path, method: str, params: Dict[str, Any]) -> BatchResult:
"""Run denoising on a single file and capture timing plus status.
Parameters
----------
path : Path
Input spectrum file.
out_dir : Path
Directory where the denoised file is written.
method : str
Denoising strategy forwarded to :func:`noise_filtering`.
params : dict[str, Any]
Additional parameters for :func:`noise_filtering`.
Returns
-------
BatchResult
Summary of the run, including elapsed time and error context.
"""
t0 = time.perf_counter()
try:
rec = load_txt_spectrum(path)
y = rec["intensity"]
if y.size == 0:
raise ValueError("Empty intensity array")
y_hat = noise_filtering(y, method=method, **params)
rec["intensity"] = y_hat
out_path = out_dir / f"{path.stem}_denoised{path.suffix}"
save_txt_spectrum(path, out_path, rec)
return BatchResult(
file=str(path), out_file=str(out_path), status="ok",
elapsed_s=time.perf_counter() - t0, n_points=int(y.size),
message=""
)
except Exception as e:
return BatchResult(
file=str(path), out_file=None, status="error",
elapsed_s=time.perf_counter() - t0, n_points=0,
message=f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
)
[docs]
def batch_denoise(
files: Union[Iterable[str], Iterable[Path]],
output_dir: Union[str, Path],
method: str = "wavelet",
n_workers: int = 0,
backend: str = "threads",
progress: bool = True,
params: Optional[Dict[str, Any]] = None,
) -> List[BatchResult]:
"""Apply the configured denoising method to multiple spectrum files.
Parameters
----------
files : Iterable[str | Path]
Collection of filesystem paths (glob results, manual list, etc.).
output_dir : str | Path
Directory where the denoised outputs will be written.
method : str, default "wavelet"
Name of the smoothing routine forwarded to :func:`noise_filtering`.
n_workers : int, default 0
Worker count for the executor. ``0`` or ``None`` selects a CPU-aware
default.
backend : {"threads", "processes"}, default "threads"
Execution strategy for the worker pool.
progress : bool, default True
If True, wrap the executor iterator in ``tqdm`` when available.
params : dict | None
Extra keyword arguments forwarded to :func:`noise_filtering`.
Returns
-------
list[BatchResult]
Status records describing each attempted file.
Raises
------
ValueError
If no input paths exist or an unsupported backend name is provided.
"""
params = dict(params or {})
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
# Normalize file list
paths = [Path(p) for p in files] if not isinstance(files, (str, Path)) else [Path(files)]
paths = [p for p in paths if p.exists()]
if not paths:
raise ValueError("No input files exist. Check your 'files' iterable and working directory.")
# Backend selection
if backend not in {"threads", "processes"}:
raise ValueError(f"Invalid backend='{backend}'. Use 'threads' or 'processes'.")
# Default workers
if not n_workers or n_workers < 0:
import os
n_workers = os.cpu_count() or 4
worker = partial(_process_one, out_dir=out_dir, method=method, params=params)
results: List[BatchResult] = []
Executor = ThreadPoolExecutor if backend == "threads" else ProcessPoolExecutor
# IMPORTANT: consume futures via as_completed to ensure work executes
with Executor(max_workers=n_workers) as ex:
futs = {ex.submit(worker, p): p for p in paths}
if progress:
try:
from tqdm import tqdm
it = tqdm(as_completed(futs), total=len(futs), desc="Denoising")
except Exception:
it = as_completed(futs)
else:
it = as_completed(futs)
for fut in it:
res = fut.result()
results.append(res)
if progress and 'tqdm' in globals():
pass # tqdm advances via iteration
# Surface failures clearly
errors = [r for r in results if r.status == "error"]
if errors:
logger.warning(f"[batch_denoise] {len(errors)} errors encountered. First error:\n{errors[0].message}")
return results