#!/usr/bin/env python3
"""
Comprehensive analysis for Cancer vs Control ToF-SIMS-like intensity tables.
Input CSV requirements
----------------------
- Columns:
- 'SampleName' : sample ID (string)
- 'Group' : class label ('Cancer' or 'Control')
- Remaining cols : numeric features (m/z intensities); header names are m/z values
- File should already be imputed and non-negative if you enable cNMF.
What this script produces
-------------------------
In the output directory (default: ./analysis_outputs), it writes:
- label_counts.csv
- univariate_results.csv (Welch t-test per feature, log2 fold-change, BH-FDR q-values)
- volcano.png
- pca.png
- (optional) umap.png -- if --umap flag set and umap-learn is installed
- roc_logistic.png, roc_random_forest.png
- model_performance.csv
- importance_lr_l1.png, importance_rf.png
- heatmap_top{N}.png -- heatmap of top-N features by FDR
- embeddings.csv -- PCA (and UMAP if requested)
- If --cnmf is provided:
- cnmf_summary_k{K}.txt
- cnmf_consensus_k{K}.npy
- cnmf_PAC_vs_k.csv
- cnmf_consensus_best.png
- cnmf_W_best.npy, cnmf_H_best.npy
- cnmf_factor_{j}_top_features.csv (top m/z contributors per factor)
- cnmf_factor_{j}_bar.png (bar plot of top contributors)
Usage
-----
python analyze_breast_spectra.py \
--input aligned_peaks_intensity_breast_new_imputed_rf.csv \
--outdir analysis_outputs \
--topn 25 \
--umap \
--cnmf --k_list 3 4 5 6 7 --cnmf_reps 30 --cnmf_beta KL
Notes
-----
- Welch t-tests (unequal variances) + Benjamini–Hochberg FDR control.
- PCA on log1p-standardized intensities.
- Classifiers: Logistic Regression (L1, saga) and Random Forest.
- cNMF implements multiple NMF runs per k, aligns factors (Hungarian matching),
builds a consensus co-clustering matrix, computes PAC stability, and selects k.
References (methods; general, not version-specific)
---------------------------------------------------
- Welch’s t-test: Welch, 1947; BH-FDR: Benjamini & Hochberg, 1995.
- PCA: Pearson, 1901; Hotelling, 1933.
- Logistic regression & L1 regularization: Tibshirani, 1996 (Lasso).
- Random Forest: Breiman, 2001.
- UMAP: McInnes et al., 2018.
- NMF (MU updates): Lee & Seung, 2001; cNMF: Brunet et al., 2004; survey in Berry et al., 2007.
Author’s note
-------------
- Where I recommend KL loss for count-like data, that is a common practice in
mass-spec intensity modeling, consistent with Poisson-like noise assumptions
and NMF literature (opinion grounded in cited works above).
"""
import logging
import os
import math
import argparse
import numpy as np
logger = logging.getLogger(__name__)
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict, Optional
from scipy import stats
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.metrics import roc_curve, auc
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
try:
import umap # optional
_HAVE_UMAP = True
except Exception:
_HAVE_UMAP = False
# ----------------------------- I/O --------------------------------
[docs]
def ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
# ---------------------- basic statistics --------------------------
[docs]
def bh_fdr(pvals: np.ndarray) -> np.ndarray:
"""Benjamini–Hochberg FDR for a 1D array of p-values."""
p = np.asarray(pvals, dtype=float)
n = p.size
if n == 0:
return p.copy()
order = np.argsort(p)
ranks = np.arange(1, n + 1, dtype=float)
# Compute adjusted p-values: p[order] * n / rank
adjusted = p[order] * n / ranks
# Enforce monotonicity from the bottom up via cumulative minimum on reversed array
adjusted = np.minimum.accumulate(adjusted[::-1])[::-1]
# Place back into original order
q = np.empty_like(p, dtype=float)
q[order] = adjusted
return np.clip(q, 0, 1)
[docs]
def compute_univariate_tests(X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
"""Welch t-test per feature and log2 fold-change (Cancer / Control)."""
y = y.astype(str).str.strip().str.capitalize()
mask_c = (y == "Cancer").values
mask_n = (y == "Control").values
means_c = X[mask_c].mean(axis=0).values
means_n = X[mask_n].mean(axis=0).values
eps = 1e-12
log2_fc = np.log2((means_c + eps) / (means_n + eps))
# Vectorized Welch t-test across all features at once
Xc = X[mask_c].values # (n_cancer, n_features)
Xn = X[mask_n].values # (n_control, n_features)
nc, nn = Xc.shape[0], Xn.shape[0]
mean_c = np.nanmean(Xc, axis=0)
mean_n = np.nanmean(Xn, axis=0)
var_c = np.nanvar(Xc, axis=0, ddof=1)
var_n = np.nanvar(Xn, axis=0, ddof=1)
se = np.sqrt(var_c / nc + var_n / nn)
t_stat = np.where(se > 0, (mean_c - mean_n) / se, 0.0)
# Welch-Satterthwaite degrees of freedom
num = (var_c / nc + var_n / nn) ** 2
denom = (var_c / nc) ** 2 / (nc - 1) + (var_n / nn) ** 2 / (nn - 1)
df_welch = np.where(denom > 0, num / denom, 1.0)
pvals = 2.0 * stats.t.sf(np.abs(t_stat), df_welch)
pvals = np.where(np.isfinite(pvals), pvals, 1.0)
qvals = bh_fdr(pvals)
res = pd.DataFrame({
"feature": X.columns,
"mean_Cancer": means_c,
"mean_Control": means_n,
"log2_FC": log2_fc,
"p_value": pvals,
"q_value": qvals
}).sort_values("q_value", ascending=True).reset_index(drop=True)
return res
# --------------------------- plots --------------------------------
[docs]
def plot_volcano(res: pd.DataFrame, outpath: str, q_thresh: float = 0.05, fc_thresh: float = 1.0):
x = res["log2_FC"].values
y = -np.log10(res["p_value"].values + 1e-300)
plt.figure(figsize=(7, 6))
plt.scatter(x, y, s=16, alpha=0.7)
plt.axvline(fc_thresh, linestyle="--")
plt.axvline(-fc_thresh, linestyle="--")
p_proxy = res.loc[res["q_value"] <= q_thresh, "p_value"].max()
if isinstance(p_proxy, float) and np.isfinite(p_proxy) and p_proxy > 0:
plt.axhline(-math.log10(p_proxy), linestyle="--")
plt.xlabel("log2 Fold Change (Cancer / Control)")
plt.ylabel("-log10(p-value)")
plt.title("Volcano plot")
plt.tight_layout()
plt.savefig(outpath, dpi=200)
plt.close()
[docs]
def plot_pca(X_scaled: np.ndarray, y: pd.Series, outpath: str) -> Tuple[np.ndarray, np.ndarray]:
pca = PCA(n_components=2, random_state=0)
Z = pca.fit_transform(X_scaled)
y = y.astype(str)
plt.figure(figsize=(7, 6))
for lab in sorted(y.unique()):
mask = (y == lab).values
plt.scatter(Z[mask, 0], Z[mask, 1], s=20, alpha=0.8, label=str(lab))
plt.legend()
plt.xlabel(f"PC1 ({pca.explained_variance_ratio_[0]*100:.1f}%)")
plt.ylabel(f"PC2 ({pca.explained_variance_ratio_[1]*100:.1f}%)")
plt.title("PCA (standardized features)")
plt.tight_layout()
plt.savefig(outpath, dpi=200)
plt.close()
return Z, pca.explained_variance_ratio_
[docs]
def plot_umap(X_scaled: np.ndarray, y: pd.Series, outpath: str, n_neighbors: int = 15, min_dist: float = 0.1):
if not _HAVE_UMAP:
return None
reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, random_state=0)
Z = reducer.fit_transform(X_scaled)
y = y.astype(str)
plt.figure(figsize=(7, 6))
for lab in sorted(y.unique()):
mask = (y == lab).values
plt.scatter(Z[mask, 0], Z[mask, 1], s=20, alpha=0.8, label=str(lab))
plt.legend()
plt.xlabel("UMAP1")
plt.ylabel("UMAP2")
plt.title("UMAP (standardized features)")
plt.tight_layout()
plt.savefig(outpath, dpi=200)
plt.close()
return Z
[docs]
def plot_heatmap_top_features(X: pd.DataFrame, y: pd.Series, res: pd.DataFrame, outpath: str, top_n: int = 25):
top_feats = res.sort_values("q_value", ascending=True).head(top_n)["feature"].tolist()
X_sel = X[top_feats].copy()
X_z = (X_sel - X_sel.mean(axis=0)) / (X_sel.std(axis=0) + 1e-12)
X_mat = X_z.values
order = np.argsort(y.values.astype(str))
X_ord = X_mat[order, :]
y_ord = y.values.astype(str)[order]
plt.figure(figsize=(max(6, top_n * 0.25), 6))
plt.imshow(X_ord.T, aspect="auto", interpolation="nearest")
plt.yticks(range(len(top_feats)), top_feats)
# boundary between groups
unique_labels, counts = np.unique(y_ord, return_counts=True)
boundary = counts[0] if len(counts) > 1 else None
if boundary is not None and boundary < X_ord.shape[0]:
plt.axvline(boundary - 0.5)
plt.xlabel("Samples (ordered by Group)")
plt.ylabel("Top features (z-scored)")
plt.title("Heatmap of top differential features")
plt.tight_layout()
plt.savefig(outpath, dpi=200)
plt.close()
# -------------------------- models --------------------------------
[docs]
def run_models(X_scaled: np.ndarray, y01: np.ndarray, features: List[str], outdir: str, seed: int = 0):
X_tr, X_te, y_tr, y_te = train_test_split(X_scaled, y01, test_size=0.2, stratify=y01, random_state=seed)
# Logistic Regression with L1
lr = LogisticRegression(penalty="l1", solver="saga", max_iter=2000, C=1.0, n_jobs=-1, random_state=seed)
lr.fit(X_tr, y_tr)
y_proba_lr = lr.predict_proba(X_te)[:, 1]
fpr_lr, tpr_lr, _ = roc_curve(y_te, y_proba_lr)
auc_lr = auc(fpr_lr, tpr_lr)
plt.figure(figsize=(6, 5))
plt.plot(fpr_lr, tpr_lr, linewidth=2)
plt.plot([0, 1], [0, 1], linestyle="--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title(f"ROC - Logistic Regression (AUC={auc_lr:.3f})")
plt.tight_layout()
plt.savefig(os.path.join(outdir, "roc_logistic.png"), dpi=200)
plt.close()
# Random Forest
rf = RandomForestClassifier(n_estimators=500, max_features="sqrt", random_state=seed, n_jobs=-1)
rf.fit(X_tr, y_tr)
y_proba_rf = rf.predict_proba(X_te)[:, 1]
fpr_rf, tpr_rf, _ = roc_curve(y_te, y_proba_rf)
auc_rf = auc(fpr_rf, tpr_rf)
plt.figure(figsize=(6, 5))
plt.plot(fpr_rf, tpr_rf, linewidth=2)
plt.plot([0, 1], [0, 1], linestyle="--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title(f"ROC - Random Forest (AUC={auc_rf:.3f})")
plt.tight_layout()
plt.savefig(os.path.join(outdir, "roc_random_forest.png"), dpi=200)
plt.close()
# Cross-validated AUCs
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
auc_cv_lr = cross_val_score(lr, X_scaled, y01, cv=cv, scoring="roc_auc", n_jobs=-1)
auc_cv_rf = cross_val_score(rf, X_scaled, y01, cv=cv, scoring="roc_auc", n_jobs=-1)
metrics = pd.DataFrame({
"model": ["Logistic_L1", "RandomForest"],
"holdout_AUC": [auc_lr, auc_rf],
"cv_AUC_mean": [auc_cv_lr.mean(), auc_cv_rf.mean()],
"cv_AUC_std": [auc_cv_lr.std(), auc_cv_rf.std()],
})
metrics.to_csv(os.path.join(outdir, "model_performance.csv"), index=False)
# Feature importances
coef = np.abs(lr.coef_.ravel())
top_idx_lr = np.argsort(coef)[::-1][:15]
plt.figure(figsize=(8, 5))
plt.bar(range(len(top_idx_lr)), coef[top_idx_lr])
plt.xticks(range(len(top_idx_lr)), [features[i] for i in top_idx_lr], rotation=90)
plt.title("Top LR (L1) Feature Coefficients (abs)")
plt.tight_layout()
plt.savefig(os.path.join(outdir, "importance_lr_l1.png"), dpi=200)
plt.close()
imp = rf.feature_importances_
top_idx_rf = np.argsort(imp)[::-1][:15]
plt.figure(figsize=(8, 5))
plt.bar(range(len(top_idx_rf)), imp[top_idx_rf])
plt.xticks(range(len(top_idx_rf)), [features[i] for i in top_idx_rf], rotation=90)
plt.title("Top Random Forest Feature Importances")
plt.tight_layout()
plt.savefig(os.path.join(outdir, "importance_rf.png"), dpi=200)
plt.close()
# -------------------------- cNMF ----------------------------------
from sklearn.decomposition import NMF
from sklearn.preprocessing import normalize
from scipy.optimize import linear_sum_assignment
def _cosine_similarity_columns(A: np.ndarray, B: np.ndarray) -> np.ndarray:
A_n = normalize(A, axis=0) + 0.0
B_n = normalize(B, axis=0) + 0.0
return A_n.T @ B_n
def _align_components(W_list: List[np.ndarray], H_list: List[np.ndarray]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
W0, H0 = W_list[0], H_list[0]
aligned_W, aligned_H = [W0], [H0]
ref = H0.copy()
for r in range(1, len(H_list)):
sim = _cosine_similarity_columns(ref.T, H_list[r].T) # (k x k)
row_ind, col_ind = linear_sum_assignment(1.0 - sim) # maximize similarity
W_r = W_list[r][:, col_ind]
H_r = H_list[r][col_ind, :]
aligned_W.append(W_r)
aligned_H.append(H_r)
return aligned_W, aligned_H
def _consensus_matrix(W: np.ndarray) -> np.ndarray:
labels = np.argmax(W, axis=1)
# Vectorized: samples with the same label get co-occurrence = 1
C = (labels[:, np.newaxis] == labels[np.newaxis, :]).astype(float)
return C
def _pac_score(consensus: np.ndarray, lower: float = 0.1, upper: float = 0.9) -> float:
vals = consensus[np.triu_indices_from(consensus, k=1)]
return np.mean((vals > lower) & (vals < upper)) if vals.size else 1.0
[docs]
def run_cnmf(
X_pos: np.ndarray,
k_list: List[int],
R: int = 30,
max_iter: int = 1000,
beta: str = "frobenius", # "frobenius", "kullback-leibler"
random_seeds: Optional[List[int]] = None,
outdir: Optional[str] = None
) -> Dict[int, Dict[str, object]]:
"""
Consensus NMF across k values.
Returns dict[k] with W_mean, H_mean, consensus, PAC, W_list, H_list.
"""
n, p = X_pos.shape
if random_seeds is None:
random_seeds = list(range(R))
elif len(random_seeds) < R:
random_seeds = (random_seeds * ((R + len(random_seeds) - 1) // len(random_seeds)))[:R]
results: Dict[int, Dict[str, object]] = {}
for k in k_list:
W_runs, H_runs = [], []
for r in range(R):
model = NMF(
n_components=k,
init="nndsvda",
random_state=random_seeds[r],
max_iter=max_iter,
solver=("mu" if beta != "frobenius" else "cd"),
beta_loss=("kullback-leibler" if beta == "KL" or beta == "kullback-leibler" else "frobenius"),
)
W = model.fit_transform(X_pos)
H = model.components_
W_runs.append(W)
H_runs.append(H)
W_aligned, H_aligned = _align_components(W_runs, H_runs)
# consensus
C_sum = np.zeros((n, n), dtype=float)
for W_r in W_aligned:
C_sum += _consensus_matrix(W_r)
consensus = C_sum / R
pac = _pac_score(consensus)
W_mean = np.mean(np.stack(W_aligned, axis=2), axis=2)
H_mean = np.mean(np.stack(H_aligned, axis=2), axis=2)
results[k] = dict(
W_mean=W_mean,
H_mean=H_mean,
consensus=consensus,
PAC=float(pac),
W_list=W_aligned,
H_list=H_aligned
)
if outdir:
with open(os.path.join(outdir, f"cnmf_summary_k{k}.txt"), "w") as f:
f.write(f"k={k}\nPAC={pac:.6f}\n")
np.save(os.path.join(outdir, f"cnmf_consensus_k{k}.npy"), consensus)
return results
[docs]
def choose_k_by_pac(results: Dict[int, Dict[str, object]]) -> int:
ks = sorted(results.keys())
pacs = [(k, float(results[k]["PAC"])) for k in ks]
pacs.sort(key=lambda t: (t[1], t[0]))
return pacs[0][0]
[docs]
def save_consensus_heatmap(consensus: np.ndarray, labels: pd.Series, outpath: str):
order = np.argsort(labels.values.astype(str))
C_ord = consensus[order][:, order]
plt.figure(figsize=(6, 5))
plt.imshow(C_ord, aspect="auto", interpolation="nearest")
plt.title("Consensus matrix (samples ordered by Group)")
plt.colorbar()
plt.tight_layout()
plt.savefig(outpath, dpi=200)
plt.close()
[docs]
def save_factor_bars(H: np.ndarray, feature_names: List[str], outdir: str, topm: int = 15):
k = H.shape[0]
for j in range(k):
row = H[j, :]
idx = np.argsort(row)[::-1][:topm]
feats = [feature_names[i] for i in idx]
vals = row[idx]
pd.DataFrame({"feature": feats, "loading": vals}).to_csv(
os.path.join(outdir, f"cnmf_factor_{j+1}_top_features.csv"), index=False
)
plt.figure(figsize=(8, 5))
plt.bar(range(len(idx)), vals)
plt.xticks(range(len(idx)), feats, rotation=90)
plt.title(f"cNMF factor {j+1}: top {topm} features")
plt.tight_layout()
plt.savefig(os.path.join(outdir, f"cnmf_factor_{j+1}_bar.png"), dpi=200)
plt.close()
# ---------------------------- main -------------------------------
[docs]
def main(input_file, outdir, topn=25, umap=False, cnmf=False, k_list=None, cnmf_reps=30, cnmf_beta="frobenius"):
# Load data
df = pd.read_csv(input_file)
# Basic columns
if "Group" not in df.columns or "SampleName" not in df.columns:
raise ValueError("Input must contain 'SampleName' and 'Group' columns.")
meta_cols = ["SampleName", "Group"]
feature_cols = [c for c in df.columns if c not in meta_cols]
# Coerce numeric features
X_df = df[feature_cols].apply(pd.to_numeric, errors="coerce")
y_sr = df["Group"].astype(str).str.strip().str.capitalize()
# Save label counts
y_sr.value_counts().to_frame("count").to_csv(os.path.join(outdir, "label_counts.csv"))
# --- Univariate tests ---
uni = compute_univariate_tests(X_df, y_sr)
uni.to_csv(os.path.join(outdir, "univariate_results.csv"), index=False)
plot_volcano(uni, os.path.join(outdir, "volcano.png"))
# --- PCA / UMAP embeddings ---
X_log = np.log1p(X_df.values) # variance stabilization (keeps non-negativity)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_log) # standardized for PCA/UMAP
Z_pca, var_ratio = plot_pca(X_scaled, y_sr, os.path.join(outdir, "pca.png"))
coords = pd.DataFrame({
"SampleName": df["SampleName"],
"Group": y_sr,
"PCA1": Z_pca[:, 0],
"PCA2": Z_pca[:, 1],
})
if umap and _HAVE_UMAP:
Z_umap = plot_umap(X_scaled, y_sr, os.path.join(outdir, "umap.png"))
if Z_umap is not None:
coords["UMAP1"] = Z_umap[:, 0]
coords["UMAP2"] = Z_umap[:, 1]
coords.to_csv(os.path.join(outdir, "embeddings.csv"), index=False)
# --- Supervised models ---
y01 = (y_sr == "Cancer").astype(int).values
run_models(X_scaled, y01, feature_cols, outdir=outdir, seed=0)
# --- Heatmap of top features ---
plot_heatmap_top_features(X_df, y_sr, uni, os.path.join(outdir, f"heatmap_top{topn}.png"), top_n=topn)
# --- cNMF (optional) ---
if cnmf:
# Check non-negativity (NMF requirement)
if (X_df.values < 0).any():
raise ValueError("cNMF requires non-negative features. Found negative values in the input.")
results = run_cnmf(
X_pos=X_df.values.astype(float, copy=False),
k_list=k_list,
R=cnmf_reps,
max_iter=1000,
beta=cnmf_beta,
outdir=outdir
)
# Save PAC over k
pac_table = pd.DataFrame({"k": sorted(results.keys()),
"PAC": [results[k]["PAC"] for k in sorted(results.keys())]})
pac_table.to_csv(os.path.join(outdir, "cnmf_PAC_vs_k.csv"), index=False)
best_k = min(results.keys(), key=lambda kk: (results[kk]["PAC"], kk))
W = results[best_k]["W_mean"]
H = results[best_k]["H_mean"]
np.save(os.path.join(outdir, "cnmf_W_best.npy"), W)
np.save(os.path.join(outdir, "cnmf_H_best.npy"), H)
# Consensus heatmap (ordered by label)
save_consensus_heatmap(results[best_k]["consensus"], y_sr, os.path.join(outdir, "cnmf_consensus_best.png"))
# Save factor bars and top features per factor
save_factor_bars(H, feature_cols, outdir, topm=15)
with open(os.path.join(outdir, f"cnmf_summary_k{best_k}.txt"), "a") as f:
f.write("\nSelected as best k by minimal PAC.\n")
logger.info(f"Done. Outputs written to: {outdir}")