"""High-level utilities for parallel peak extraction and alignment."""
import tqdm
import pandas as pd
from functools import partial
from concurrent.futures import ProcessPoolExecutor, as_completed
from ..detection.detection import (
detect_peaks_with_area,
detect_peaks_cwt_with_area,
robust_peak_detection,
align_peaks as _align_peaks,
collect_peak_properties_batch as _collect_peaks_detection,
)
from ..normalization.preprocessing import data_preprocessing
def _process_one_file(
file_path,
mz_min,
mz_max,
normalization_target,
method,
min_intensity,
min_snr,
min_distance,
window_size,
peak_height,
prominence,
min_peak_width,
max_peak_width,
width_rel_height,
distance_threshold,
):
"""Return a list of peak records extracted from ``file_path``."""
try:
sample_name, group, mz_vals, norm_int = data_preprocessing(
file_path=file_path,
mz_min=mz_min,
mz_max=mz_max,
normalization_target=normalization_target,
verbose=False,
return_all=False,
)
if method is None:
peak_props = detect_peaks_with_area(
mz_values=mz_vals,
intensities=norm_int,
sample_name=sample_name,
group=group,
min_intensity=min_intensity,
min_snr=min_snr,
min_distance=min_distance,
window_size=window_size,
peak_height=peak_height,
prominence=prominence,
min_peak_width=min_peak_width,
max_peak_width=max_peak_width,
width_rel_height=width_rel_height,
)
elif method == "cwt":
peak_props = detect_peaks_cwt_with_area(
mz_values=mz_vals,
intensities=norm_int,
sample_name=sample_name,
group=group,
min_intensity=min_intensity,
min_snr=min_snr,
min_distance=min_distance,
window_size=window_size,
peak_height=peak_height,
prominence=prominence,
min_peak_width=min_peak_width,
max_peak_width=max_peak_width,
width_rel_height=width_rel_height,
)
else:
peak_props = robust_peak_detection(
mz_values=mz_vals,
intensities=norm_int,
sample_name=sample_name,
group=group,
method=method,
min_intensity=min_intensity,
min_snr=min_snr,
min_distance=min_distance,
window_size=window_size,
peak_height=peak_height,
prominence=prominence,
min_peak_width=min_peak_width,
max_peak_width=max_peak_width,
width_rel_height=width_rel_height,
distance_threshold=distance_threshold,
)
# Vectorized: convert DataFrame to records directly
if isinstance(peak_props, pd.DataFrame) and len(peak_props) > 0:
cols_to_keep = [
'SampleName',
'Group',
'PeakCenter',
'PeakWidth',
'Prominences',
'PeakArea',
'Amplitude',
'DetectedBy',
'Deconvoluted',
'FitModel',
'AreaDefinition',
'WidthDefinition',
'IntegrationMethod',
]
existing_cols = [c for c in cols_to_keep if c in peak_props.columns]
batch = peak_props[existing_cols].copy()
batch['SampleName'] = batch['SampleName'].astype(str)
batch['Group'] = batch['Group'].astype(str)
batch['DetectedBy'] = batch['DetectedBy'].astype(str)
batch['Deconvoluted'] = batch['Deconvoluted'].astype(str)
return batch.to_dict('records')
return []
except Exception as exc: # pragma: no cover - defensive
raise RuntimeError(f"{file_path}: {exc}") from exc
[docs]
def batch_processing(
files,
*,
max_workers=None,
mz_min=None,
mz_max=None,
normalization_target=1e8,
method=None,
min_intensity=1,
min_snr=3,
min_distance=5,
window_size=10,
peak_height=50,
prominence=50,
min_peak_width=1,
max_peak_width=75,
width_rel_height=0.5,
distance_threshold=0.01,
mz_tolerance=0.2,
mz_rounding_precision=1,
):
"""Parallel batch peak detection.
Returns ``(peaks_df, intensity_df, area_df)`` analogous to the legacy
implementation.
"""
worker = partial(
_process_one_file,
mz_min=mz_min,
mz_max=mz_max,
normalization_target=normalization_target,
method=method,
min_intensity=min_intensity,
min_snr=min_snr,
min_distance=min_distance,
window_size=window_size,
peak_height=peak_height,
prominence=prominence,
min_peak_width=min_peak_width,
max_peak_width=max_peak_width,
width_rel_height=width_rel_height,
distance_threshold=distance_threshold,
)
all_peak_records = []
with ProcessPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(worker, fp): fp for fp in files}
for fut in tqdm.tqdm(
as_completed(futures),
total=len(futures),
desc="Processing ToF-SIMS files",
):
all_peak_records.extend(fut.result())
if not all_peak_records:
raise ValueError("No peaks detected in any file. Check parameters or data quality.")
peaks_df = pd.DataFrame(all_peak_records)
intensity_df = _align_peaks(
peaks_df,
mz_tolerance=mz_tolerance,
mz_rounding_precision=mz_rounding_precision,
output="intensity",
)
area_df = _align_peaks(
peaks_df,
mz_tolerance=mz_tolerance,
mz_rounding_precision=mz_rounding_precision,
output="area",
)
return peaks_df, intensity_df, area_df
# Backwards-compatible re-exports -----------------------------------------------------
align_peaks = _align_peaks
collect_peak_properties_batch = _collect_peaks_detection