# Data import function
import logging
import os
import glob
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
from os import cpu_count
from typing import List, Optional, Tuple
import numpy as np
import polars as pl
from scipy.interpolate import Akima1DInterpolator, CubicSpline, PchipInterpolator
logger = logging.getLogger(__name__)
from ..utils.file_management import import_data
from .normalization import tic_normalization
_RESAMPLE_METHODS = ("linear", "pchip", "akima", "makima", "cubic")
def _build_akima_interpolator(mz_values, intensity_values, method):
"""Build an Akima-family interpolator across SciPy versions."""
if method == "makima":
try:
return Akima1DInterpolator(
mz_values,
intensity_values,
method="makima",
extrapolate=False,
)
except (TypeError, ValueError) as exc:
raise ValueError(
"resample method 'makima' requires scipy>=1.13.0."
) from exc
try:
return Akima1DInterpolator(
mz_values,
intensity_values,
extrapolate=False,
)
except TypeError:
return Akima1DInterpolator(mz_values, intensity_values)
[docs]
def resample_spectrum(mz_values, intensity_values, target_mz, method="linear"):
"""Resample a spectrum onto a target m/z grid.
The input axis is sorted, duplicate m/z positions are collapsed to their
first occurrence, and values outside the native m/z range are filled with
zero. Supported interpolation methods are ``linear``, ``pchip``,
``akima``, ``makima``, and ``cubic``.
"""
mz_values = np.asarray(mz_values, dtype=float).reshape(-1)
intensity_values = np.asarray(intensity_values, dtype=float).reshape(-1)
target_mz = np.asarray(target_mz, dtype=float)
if mz_values.size != intensity_values.size:
raise ValueError(
"mz_values and intensity_values must have the same length."
)
if mz_values.size == 0:
return np.zeros_like(target_mz, dtype=float)
order = np.argsort(mz_values)
mz_values = mz_values[order]
intensity_values = intensity_values[order]
mz_values, unique_idx = np.unique(mz_values, return_index=True)
intensity_values = intensity_values[unique_idx]
if method == "linear" or mz_values.size == 1:
resampled = np.interp(
target_mz,
mz_values,
intensity_values,
left=0.0,
right=0.0,
)
return np.asarray(resampled, dtype=float)
if method == "pchip":
interpolator = PchipInterpolator(
mz_values,
intensity_values,
extrapolate=False,
)
elif method in {"akima", "makima"}:
interpolator = _build_akima_interpolator(
mz_values,
intensity_values,
method,
)
elif method == "cubic":
interpolator = CubicSpline(
mz_values,
intensity_values,
extrapolate=False,
)
else:
valid_methods = ", ".join(_RESAMPLE_METHODS)
raise ValueError(
f"Unknown resample_method={method!r}. Use one of: {valid_methods}."
)
resampled = interpolator(target_mz)
resampled = np.nan_to_num(resampled, nan=0.0, posinf=0.0, neginf=0.0)
outside = (target_mz < mz_values[0]) | (target_mz > mz_values[-1])
resampled = np.where(outside, 0.0, resampled)
resampled = np.clip(resampled, 0.0, None)
return np.asarray(resampled, dtype=float)
[docs]
def data_preprocessing(
file_path,
mz_min=None,
mz_max=None,
normalization_target=1e6,
verbose=True,
return_all=False
):
"""
Import and preprocess ToF-SIMS data from a text file.
Parameters:
-----------
file_path : str
Path to the ToF-SIMS data file
mz_min, mz_max : float, optional
m/z range to import
normalization_target : float or None
Target TIC for normalization, or None to skip
verbose : bool
Print progress if True
return_all : bool
If True, return all intermediate arrays
Returns:
--------
mz_values : numpy.ndarray
normalized_intensities : numpy.ndarray
sample_name : str
group : str
(optionally: intermediate arrays)
"""
if not os.path.isfile(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
mz_values, intensity, sample_name, group = import_data(file_path, mz_min, mz_max)
if verbose: logger.info(f"Imported data: {sample_name} {mz_values.shape} {intensity.shape}")
# TIC normalization
if normalization_target:
normalized_intensities = tic_normalization(intensity, target_tic=normalization_target)
if verbose: logger.info(f"TIC normalized.")
else:
normalized_intensities = intensity
if return_all:
return (sample_name, group, mz_values,
intensity, normalized_intensities)
else:
return sample_name, group, mz_values, normalized_intensities
# Batch preprocessing helper
[docs]
def batch_tic_norm(input_pattern: str,
output_dir: str = "normalized_spectra",
mz_min: Optional[float] = None,
mz_max: Optional[float] = None,
normalization_target: Optional[float] = 1e6,
verbose: bool = False) -> List[str]:
"""
Batch‑import and preprocess multiple ToF‑SIMS spectra, then save the
(m/z, normalized_intensity) arrays for each file as a tab‑separated text
file in *output_dir*.
Parameters
----------
input_pattern : str
Glob pattern (e.g. 'spectra/*.txt') that expands to the input files.
output_dir : str
Folder where '<original‑name>_normalized.txt' will be written;
created if it does not already exist.
mz_min, mz_max, normalization_target, verbose
Passed through to :pyfunc:`data_preprocessing`.
Returns
-------
List[str]
Paths of the files written, in processing order.
"""
import glob
import numpy as np
files = sorted(glob.glob(input_pattern))
if not files:
raise FileNotFoundError(f"No files matched pattern '{input_pattern}'")
os.makedirs(output_dir, exist_ok=True)
written: List[str] = []
for fp in files:
sample_name, group, mz_vals, norm_intens = data_preprocessing(
fp,
mz_min=mz_min,
mz_max=mz_max,
normalization_target=normalization_target,
verbose=verbose,
return_all=False,
)
base = os.path.splitext(os.path.basename(fp))[0]
out_path = os.path.join(output_dir, f"{base}_normalized.txt")
np.savetxt(
out_path,
np.column_stack((mz_vals, norm_intens)),
fmt="%.6f\t%.6e",
header="m/z\tIntensity",
comments=""
)
if verbose:
logger.info(f"Wrote {out_path}")
written.append(out_path)
return written
[docs]
class BatchTicNorm:
"""
Batch TIC normalization for multiple spectra files using Polars and concurrent.futures.
Supports both CSV and TXT file formats:
- CSV: Uses 'corrected_intensity' if available, otherwise 'intensity'
- TXT: Tab-separated m/z and intensity values
Output files contain: channel, mz, intensity (normalized)
"""
[docs]
def __init__(
self,
input_pattern: str,
output_dir: str = "normalized_spectra",
normalization_target: float = 1e6,
n_workers: int = -1,
verbose: bool = True
):
"""
Initialize BatchTicNorm processor.
Parameters
----------
input_pattern : str
Glob pattern for input files (e.g., 'data/*.csv' or 'data/*.txt')
output_dir : str
Directory to save normalized files
normalization_target : float
Target TIC value for normalization (default: 1e6)
n_workers : int
Number of parallel workers (default: 16)
verbose : bool
Print progress information
"""
self.input_pattern = input_pattern
self.output_dir = Path(output_dir)
self.normalization_target = normalization_target
self.n_workers = cpu_count() if n_workers <= 0 else min(n_workers, cpu_count())
self.verbose = verbose
# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)
# Find input files
self.input_files = sorted(glob.glob(input_pattern))
if not self.input_files:
raise FileNotFoundError(f"No files matched pattern: {input_pattern}")
if self.verbose:
logger.info(f"Found {len(self.input_files)} files to process")
logger.info(f"Using {self.n_workers} workers")
def _read_file(self, file_path: str) -> pl.DataFrame:
"""
Read a single file (CSV or TXT) and return a Polars DataFrame.
Parameters
----------
file_path : str
Path to the input file
Returns
-------
pl.DataFrame
DataFrame with columns: channel, mz, intensity
"""
file_ext = Path(file_path).suffix.lower()
if file_ext == '.csv':
# Read CSV file
df = pl.read_csv(file_path)
# Select intensity column (prefer corrected_intensity if available)
if 'corrected_intensity' in df.columns:
intensity_col = 'corrected_intensity'
elif 'intensity' in df.columns:
intensity_col = 'intensity'
else:
raise ValueError(f"No intensity column found in {file_path}")
# Check if channel column exists
if 'channel' not in df.columns:
# Create channel column if missing
df = df.with_row_index(name='channel')
# Select and rename columns
result = df.select([
pl.col('channel'),
pl.col('mz'),
pl.col(intensity_col).alias('intensity')
])
elif file_ext in ['.txt', '.tsv']:
# Read TXT/TSV file (assumes tab-separated m/z and intensity)
df = pl.read_csv(
file_path,
separator='\t',
has_header=True,
skip_rows_after_header=0
)
# If no header or only 2 columns, assume m/z and intensity
if df.shape[1] == 2:
df.columns = ['mz', 'intensity']
# Add channel column
result = df.with_row_index(name='channel').select([
pl.col('channel'),
pl.col('mz'),
pl.col('intensity')
])
else:
raise ValueError(f"Unsupported file format: {file_ext}")
return result
def _normalize_single_file(self, file_path: str) -> Tuple[str, bool]:
"""
Process and normalize a single file.
Parameters
----------
file_path : str
Path to the input file
Returns
-------
Tuple[str, bool]
(output_path, success)
"""
try:
# Read file
df = self._read_file(file_path)
# Extract intensity as numpy array for normalization
intensities = df['intensity'].to_numpy()
# Perform TIC normalization
normalized_intensities = tic_normalization(
intensities,
target_tic=self.normalization_target
)
# Create output DataFrame
output_df = df.with_columns(
pl.Series('intensity', normalized_intensities)
)
# Generate output path
base_name = Path(file_path).stem
output_path = self.output_dir / f"{base_name}_normalized.csv"
# Write to CSV
output_df.write_csv(output_path)
if self.verbose:
tic_before = np.sum(intensities)
tic_after = np.sum(normalized_intensities)
logger.info(f"✓ {Path(file_path).name}: TIC {tic_before:.2e} → {tic_after:.2e}")
return str(output_path), True
except Exception as e:
if self.verbose:
logger.error(f"✗ Error processing {Path(file_path).name}: {str(e)}")
return "", False
[docs]
def process(self) -> List[str]:
"""
Process all files using concurrent.futures.
Returns
-------
List[str]
List of output file paths that were successfully created
"""
if self.verbose:
logger.info(f"Processing {len(self.input_files)} files...")
logger.info(f"Normalization target: {self.normalization_target:.2e}")
logger.info(f"Output directory: {self.output_dir}")
# Process files in parallel
with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
results = list(executor.map(self._normalize_single_file, self.input_files))
# Collect successful outputs
output_files = [path for path, success in results if success]
if self.verbose:
logger.info(f"Completed: {len(output_files)}/{len(self.input_files)} files normalized")
logger.info(f"Output location: {self.output_dir.absolute()}")
return output_files
[docs]
def get_tic_statistics(self) -> pl.DataFrame:
"""
Calculate TIC statistics for all input files before normalization.
Returns
-------
pl.DataFrame
DataFrame with columns: filename, tic_original, tic_million
"""
stats = []
for file_path in self.input_files:
try:
df = self._read_file(file_path)
tic = df['intensity'].sum()
stats.append({
'filename': Path(file_path).name,
'tic_original': tic,
'tic_million': tic / 1e6
})
except Exception as e:
if self.verbose:
logger.error(f"Error reading {file_path}: {e}")
stats_df = pl.DataFrame(stats)
if self.verbose and len(stats) > 0:
logger.info("TIC Statistics (before normalization):")
logger.info(f" Mean TIC: {stats_df['tic_million'].mean():.2f} Million")
logger.info(f" Median TIC: {stats_df['tic_million'].median():.2f} Million")
logger.info(f" Min TIC: {stats_df['tic_million'].min():.2f} Million")
logger.info(f" Max TIC: {stats_df['tic_million'].max():.2f} Million")
return stats_df