Source code for sfplot.analysis.topology_extensions

from __future__ import annotations

import math
import os
from pathlib import Path
from typing import Any, Iterable, Mapping, Optional, Sequence

import matplotlib as mpl

mpl.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.sparse import issparse
from sklearn.neighbors import NearestNeighbors

from .searcher_findee_score import compute_cophenetic_distances_from_df, compute_cophenetic_from_distance_matrix


def _ensure_output_dir(output_dir: Optional[str | os.PathLike[str]]) -> Optional[Path]:
    if output_dir is None:
        return None
    path = Path(output_dir)
    path.mkdir(parents=True, exist_ok=True)
    return path


def _coerce_nonnegative(series: pd.Series) -> pd.Series:
    values = pd.to_numeric(series, errors="coerce").fillna(0.0).astype(float)
    min_val = float(values.min()) if len(values) else 0.0
    if min_val < 0:
        values = values - min_val
    return values.clip(lower=0.0)


def _normalize_series(values: pd.Series) -> pd.Series:
    if values.empty:
        return values.astype(float)
    values = pd.to_numeric(values, errors="coerce").fillna(0.0).astype(float)
    min_val = float(values.min())
    max_val = float(values.max())
    if math.isclose(min_val, max_val):
        if math.isclose(max_val, 0.0):
            return pd.Series(np.zeros(len(values)), index=values.index, dtype=float)
        return pd.Series(np.ones(len(values)), index=values.index, dtype=float)
    return (values - min_val) / (max_val - min_val)


def _normalize_frame_rows(frame: pd.DataFrame) -> pd.DataFrame:
    if frame.empty:
        return frame.copy()
    values = frame.apply(pd.to_numeric, errors="coerce").fillna(0.0).astype(float)
    mins = values.min(axis=1)
    maxs = values.max(axis=1)
    spans = (maxs - mins).replace(0.0, np.nan)
    normalized = values.sub(mins, axis=0).div(spans, axis=0)
    normalized = normalized.fillna(0.0)
    constant_nonzero = spans.isna() & (maxs > 0)
    if constant_nonzero.any():
        normalized.loc[constant_nonzero] = 1.0
    return normalized


def _winsorized_minmax(values: np.ndarray, upper_quantile: float = 0.99) -> np.ndarray:
    arr = np.asarray(values, dtype=float)
    if arr.size == 0:
        return arr
    arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
    if np.allclose(arr, arr.flat[0]):
        return np.zeros_like(arr) if math.isclose(float(arr.flat[0]), 0.0) else np.ones_like(arr)
    clipped = np.clip(arr, a_min=float(np.min(arr)), a_max=float(np.quantile(arr, upper_quantile)))
    min_val = float(np.min(clipped))
    max_val = float(np.max(clipped))
    if math.isclose(min_val, max_val):
        return np.zeros_like(clipped) if math.isclose(max_val, 0.0) else np.ones_like(clipped)
    return (clipped - min_val) / (max_val - min_val)


def _winsorized_normalize_series(values: pd.Series, upper_quantile: float = 0.99) -> pd.Series:
    if values.empty:
        return values.astype(float)
    numeric = pd.to_numeric(values, errors="coerce").fillna(0.0).astype(float)
    normalized = _winsorized_minmax(numeric.to_numpy(dtype=float), upper_quantile=upper_quantile)
    return pd.Series(normalized, index=numeric.index, dtype=float)


def _winsorized_normalize_frame(frame: pd.DataFrame, upper_quantile: float = 0.99) -> pd.DataFrame:
    if frame.empty:
        return frame.copy()
    normalized = frame.copy().astype(float)
    for col in normalized.columns:
        normalized[col] = _winsorized_normalize_series(normalized[col], upper_quantile=upper_quantile)
    return normalized


def _robust_scale_columns(frame: pd.DataFrame, lower_quantile: float = 0.01, upper_quantile: float = 0.99) -> pd.DataFrame:
    if frame.empty:
        return frame.copy()
    scaled = frame.copy().astype(float)
    for col in scaled.columns:
        values = scaled[col].to_numpy(dtype=float)
        if values.size == 0:
            continue
        lo = float(np.quantile(values, lower_quantile))
        hi = float(np.quantile(values, upper_quantile))
        if math.isclose(lo, hi):
            scaled[col] = 0.0 if math.isclose(hi, 0.0) else 1.0
            continue
        clipped = np.clip(values, lo, hi)
        scaled[col] = (clipped - lo) / (hi - lo)
    return scaled.fillna(0.0)


def _weighted_quantile(values: np.ndarray, weights: np.ndarray, quantile: float) -> float:
    if len(values) == 0:
        return float("nan")
    values = np.asarray(values, dtype=float)
    weights = np.asarray(weights, dtype=float)
    valid = np.isfinite(values) & np.isfinite(weights)
    if not valid.any():
        return float("nan")
    values = values[valid]
    weights = np.clip(weights[valid], 0.0, None)
    if np.allclose(weights.sum(), 0.0):
        return float(np.quantile(values, quantile))
    order = np.argsort(values)
    values = values[order]
    weights = weights[order]
    cumulative = np.cumsum(weights)
    cutoff = float(np.clip(quantile, 0.0, 1.0)) * float(cumulative[-1])
    idx = int(np.searchsorted(cumulative, cutoff, side="left"))
    idx = min(idx, len(values) - 1)
    return float(values[idx])


def _aggregate_weighted_values(values: np.ndarray, weights: np.ndarray, method: str = "weighted_median") -> float:
    valid = np.isfinite(values) & np.isfinite(weights)
    if not valid.any():
        return float("nan")
    values = np.asarray(values[valid], dtype=float)
    weights = np.clip(np.asarray(weights[valid], dtype=float), 0.0, None)
    if method == "weighted_median":
        return _weighted_quantile(values, weights, 0.5)
    if method == "weighted_trimmed_mean":
        if np.allclose(weights.sum(), 0.0):
            return float(np.mean(values))
        low = _weighted_quantile(values, weights, 0.1)
        high = _weighted_quantile(values, weights, 0.9)
        keep = (values >= low) & (values <= high)
        if not keep.any():
            keep = np.ones(len(values), dtype=bool)
        return _weighted_average(values[keep], weights[keep])
    if method == "mean":
        return _weighted_average(values, weights)
    raise ValueError("method must be one of: weighted_median, weighted_trimmed_mean, mean")


def _safe_row_cophenetic(distance_matrix: pd.DataFrame, method: str = "average") -> pd.DataFrame:
    if distance_matrix.empty:
        return pd.DataFrame()
    if len(distance_matrix.index) == 1:
        idx = distance_matrix.index.astype(str)
        return pd.DataFrame([[0.0]], index=idx, columns=idx)
    row_coph, _ = compute_cophenetic_from_distance_matrix(distance_matrix, method=method, show_corr=False)
    return row_coph


def _weighted_average(values: np.ndarray, weights: np.ndarray) -> float:
    valid = np.isfinite(values) & np.isfinite(weights)
    if not valid.any():
        return float("nan")
    values_valid = values[valid]
    weights_valid = np.clip(weights[valid], 0.0, None)
    if np.allclose(weights_valid.sum(), 0.0):
        return float(np.mean(values_valid))
    return float(np.average(values_valid, weights=weights_valid))


def _safe_to_parquet(df: pd.DataFrame, path: Path) -> bool:
    try:
        df.to_parquet(path, index=False)
        return True
    except Exception:
        return False


def _save_heatmap(matrix: pd.DataFrame, title: str, output_prefix: Path, cmap: str = "mako") -> list[str]:
    fig_w = max(6.0, 0.45 * max(1, matrix.shape[1]))
    fig_h = max(4.0, 0.35 * max(1, matrix.shape[0]))
    fig, ax = plt.subplots(figsize=(fig_w, fig_h))
    sns.heatmap(matrix, cmap=cmap, ax=ax)
    ax.set_title(title)
    ax.set_xlabel(matrix.columns.name or "")
    ax.set_ylabel(matrix.index.name or "")
    fig.tight_layout()
    paths: list[str] = []
    for ext in ("png", "pdf"):
        out = output_prefix.with_suffix(f".{ext}")
        fig.savefig(out, dpi=300, bbox_inches="tight")
        paths.append(str(out))
    plt.close(fig)
    return paths


def _save_hotspot_overlay(
    reference_df: pd.DataFrame,
    x_col: str,
    y_col: str,
    sender_mask: pd.Series,
    receiver_mask: pd.Series,
    sender_score: pd.Series,
    receiver_score: pd.Series,
    title: str,
    output_prefix: Path,
) -> list[str]:
    fig, ax = plt.subplots(figsize=(7.0, 7.0))
    ax.scatter(
        reference_df[x_col],
        reference_df[y_col],
        s=8,
        c="#D0D0D0",
        alpha=0.45,
        linewidths=0.0,
        label="Background cells",
    )

    if sender_mask.any():
        sender_cells = reference_df.loc[sender_mask]
        ax.scatter(
            sender_cells[x_col],
            sender_cells[y_col],
            s=24 + 36 * sender_score.loc[sender_mask].to_numpy(),
            c=sender_score.loc[sender_mask],
            cmap="Reds",
            alpha=0.85,
            linewidths=0.0,
            label="Sender hotspot",
        )

    if receiver_mask.any():
        receiver_cells = reference_df.loc[receiver_mask]
        ax.scatter(
            receiver_cells[x_col],
            receiver_cells[y_col],
            s=24 + 36 * receiver_score.loc[receiver_mask].to_numpy(),
            c=receiver_score.loc[receiver_mask],
            cmap="Blues",
            alpha=0.85,
            linewidths=0.0,
            label="Receiver hotspot",
            marker="s",
        )

    ax.set_title(title)
    ax.set_aspect("equal")
    ax.invert_yaxis()
    ax.set_xticks([])
    ax.set_yticks([])
    ax.legend(frameon=False, loc="upper right")
    fig.tight_layout()
    paths: list[str] = []
    for ext in ("png", "pdf"):
        out = output_prefix.with_suffix(f".{ext}")
        fig.savefig(out, dpi=300, bbox_inches="tight")
        paths.append(str(out))
    plt.close(fig)
    return paths


def _reference_from_adata(
    adata: Any,
    *,
    cluster_col: str = "Cluster",
    cell_id_col: str = "cell_id",
    x_col: str = "x",
    y_col: str = "y",
) -> pd.DataFrame:
    if "spatial" not in adata.obsm:
        raise ValueError("adata.obsm['spatial'] is required to derive the reference table.")
    if cluster_col not in adata.obs.columns:
        raise ValueError(f"{cluster_col!r} is missing from adata.obs.")
    if cell_id_col not in adata.obs.columns:
        raise ValueError(f"{cell_id_col!r} is missing from adata.obs.")

    spatial = np.asarray(adata.obsm["spatial"])
    if spatial.shape[1] < 2:
        raise ValueError("adata.obsm['spatial'] must contain at least two dimensions.")

    reference_df = pd.DataFrame(
        {
            cell_id_col: adata.obs[cell_id_col].astype(str).to_numpy(),
            x_col: spatial[:, 0],
            y_col: spatial[:, 1],
            "celltype": adata.obs[cluster_col].astype(str).to_numpy(),
        }
    )
    return reference_df


def _expression_from_adata(
    adata: Any,
    genes: Iterable[str],
    *,
    cell_id_col: str = "cell_id",
    use_raw: bool = False,
) -> pd.DataFrame:
    genes = list(dict.fromkeys(str(g) for g in genes))
    gene_index = adata.raw.var_names if use_raw and getattr(adata, "raw", None) is not None else adata.var_names
    present = [gene for gene in genes if gene in set(gene_index)]
    if not present:
        return pd.DataFrame(index=adata.obs[cell_id_col].astype(str).to_numpy())

    if use_raw and getattr(adata, "raw", None) is not None:
        matrix = adata.raw[:, present].X
    else:
        matrix = adata[:, present].X

    if issparse(matrix):
        matrix = matrix.toarray()
    else:
        matrix = np.asarray(matrix)

    return pd.DataFrame(matrix, index=adata.obs[cell_id_col].astype(str).to_numpy(), columns=present)


def _coerce_reference_df(
    reference_df: Optional[pd.DataFrame] = None,
    *,
    adata: Any = None,
    cluster_col: str = "Cluster",
    cell_id_col: str = "cell_id",
    x_col: str = "x",
    y_col: str = "y",
    celltype_col: str = "celltype",
) -> pd.DataFrame:
    if reference_df is None:
        if adata is None:
            raise ValueError("Either reference_df or adata must be provided.")
        reference_df = _reference_from_adata(adata, cluster_col=cluster_col, cell_id_col=cell_id_col, x_col=x_col, y_col=y_col)
    else:
        reference_df = reference_df.copy()
        if cell_id_col not in reference_df.columns:
            reference_df[cell_id_col] = [f"cell_{i}" for i in range(len(reference_df))]
        if celltype_col not in reference_df.columns:
            raise ValueError(f"reference_df must contain {celltype_col!r}.")
        rename_map = {}
        if celltype_col != "celltype":
            rename_map[celltype_col] = "celltype"
        if rename_map:
            reference_df = reference_df.rename(columns=rename_map)
    required = {cell_id_col, x_col, y_col, "celltype"}
    missing = required.difference(reference_df.columns)
    if missing:
        raise ValueError(f"reference_df is missing required columns: {sorted(missing)}")
    reference_df[cell_id_col] = reference_df[cell_id_col].astype(str)
    reference_df["celltype"] = reference_df["celltype"].astype(str)
    return reference_df[[cell_id_col, x_col, y_col, "celltype"]].copy()


def _coerce_expression_df(
    reference_df: pd.DataFrame,
    expression_df: Optional[pd.DataFrame] = None,
    *,
    adata: Any = None,
    genes: Optional[Iterable[str]] = None,
    cell_id_col: str = "cell_id",
    use_raw: bool = False,
) -> pd.DataFrame:
    if expression_df is None:
        if adata is None:
            raise ValueError("Either expression_df or adata must be provided.")
        expression_df = _expression_from_adata(adata, genes or [], cell_id_col=cell_id_col, use_raw=use_raw)
    else:
        expression_df = expression_df.copy()
        if cell_id_col in expression_df.columns:
            expression_df[cell_id_col] = expression_df[cell_id_col].astype(str)
            expression_df = expression_df.set_index(cell_id_col)
        expression_df.index = expression_df.index.astype(str)
        if genes is not None:
            present = [gene for gene in genes if gene in expression_df.columns]
            expression_df = expression_df.loc[:, present]

    aligned = expression_df.reindex(reference_df[cell_id_col]).fillna(0.0)
    aligned.index = reference_df[cell_id_col].astype(str)
    aligned = aligned.apply(pd.to_numeric, errors="coerce").fillna(0.0)
    return aligned


def _pick_matching_file(base: Path, patterns: Sequence[str]) -> Optional[Path]:
    for pattern in patterns:
        matches = sorted(base.glob(pattern))
        if matches:
            return matches[0]
    return None


def _load_matrix_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, index_col=0)
    df.index = df.index.astype(str)
    df.columns = df.columns.astype(str)
    return df.apply(pd.to_numeric, errors="coerce")


def _resolve_precomputed_tables(
    *,
    tbc_results: Optional[str | os.PathLike[str]] = None,
    t_and_c_df: Optional[pd.DataFrame] = None,
    structure_map: Optional[pd.DataFrame] = None,
    structure_map_df: Optional[pd.DataFrame] = None,
) -> tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]:
    topology_df = t_and_c_df.copy() if t_and_c_df is not None else None
    structure_df = structure_map_df.copy() if structure_map_df is not None else None
    if structure_df is None and structure_map is not None and isinstance(structure_map, pd.DataFrame):
        structure_df = structure_map.copy()

    if tbc_results is not None:
        base = Path(tbc_results)
        if base.is_file():
            if topology_df is None:
                topology_df = _load_matrix_csv(base)
            if structure_df is None:
                sibling = _pick_matching_file(
                    base.parent,
                    ["StructureMap_table*.csv", "*StructureMap*.csv"],
                )
                if sibling is not None:
                    structure_df = _load_matrix_csv(sibling)
        elif base.is_dir():
            if topology_df is None:
                topology_path = _pick_matching_file(
                    base,
                    ["t_and_c_result*.csv", "*t_and_c*.csv"],
                )
                if topology_path is not None:
                    topology_df = _load_matrix_csv(topology_path)
            if structure_df is None:
                structure_path = _pick_matching_file(
                    base,
                    ["StructureMap_table*.csv", "*StructureMap*.csv"],
                )
                if structure_path is not None:
                    structure_df = _load_matrix_csv(structure_path)

    if topology_df is not None:
        topology_df.index = topology_df.index.astype(str)
        topology_df.columns = topology_df.columns.astype(str)
        topology_df = topology_df.apply(pd.to_numeric, errors="coerce")
    if structure_df is not None:
        structure_df.index = structure_df.index.astype(str)
        structure_df.columns = structure_df.columns.astype(str)
        structure_df = structure_df.apply(pd.to_numeric, errors="coerce")

    return topology_df, structure_df


def _recompute_gene_topology(
    reference_df: pd.DataFrame,
    expression_df: pd.DataFrame,
    genes: Sequence[str],
    *,
    entity_points_df: Optional[pd.DataFrame] = None,
    cell_id_col: str,
    x_col: str,
    y_col: str,
    entity_min_weight: float,
    topology_method: str,
) -> pd.DataFrame:
    if not genes:
        return pd.DataFrame(columns=reference_df["celltype"].astype(str).drop_duplicates().tolist())
    present = [gene for gene in genes if gene in expression_df.columns]
    if entity_points_df is not None:
        entity_points = entity_points_df.loc[entity_points_df["entity"].astype(str).isin(genes)].copy()
    else:
        if not present:
            return pd.DataFrame(columns=reference_df["celltype"].astype(str).drop_duplicates().tolist())
        entity_points = build_entity_points_from_expression(
            reference_df,
            expression_df,
            entities=present,
            cell_id_col=cell_id_col,
            x_col=x_col,
            y_col=y_col,
            min_weight=entity_min_weight,
        )
    if entity_points.empty:
        return pd.DataFrame(index=present, columns=reference_df["celltype"].astype(str).drop_duplicates().tolist())
    return compute_entity_to_cell_topology(
        reference_df,
        entity_points,
        x_col=x_col,
        y_col=y_col,
        celltype_col="celltype",
        entity_col="entity",
        weight_col="weight",
        min_weight=entity_min_weight,
        method=topology_method,
    )


def _resolve_gene_topology_anchors(
    reference_df: pd.DataFrame,
    expression_df: pd.DataFrame,
    genes: Sequence[str],
    *,
    tbc_results: Optional[str | os.PathLike[str]] = None,
    t_and_c_df: Optional[pd.DataFrame] = None,
    structure_map: Optional[pd.DataFrame] = None,
    structure_map_df: Optional[pd.DataFrame] = None,
    anchor_mode: str = "precomputed",
    entity_points_df: Optional[pd.DataFrame] = None,
    cell_id_col: str = "cell_id",
    x_col: str = "x",
    y_col: str = "y",
    entity_min_weight: float = 0.0,
    topology_method: str = "average",
) -> tuple[pd.DataFrame, dict[str, str], Optional[pd.DataFrame], str]:
    if anchor_mode not in {"precomputed", "recompute", "hybrid"}:
        raise ValueError("anchor_mode must be one of: precomputed, recompute, hybrid")

    celltypes = list(dict.fromkeys(reference_df["celltype"].astype(str).tolist()))
    genes = list(dict.fromkeys(str(gene) for gene in genes))
    precomputed_topology, precomputed_structure = _resolve_precomputed_tables(
        tbc_results=tbc_results,
        t_and_c_df=t_and_c_df,
        structure_map=structure_map,
        structure_map_df=structure_map_df,
    )

    topology_parts: list[pd.DataFrame] = []
    source_by_gene: dict[str, str] = {}

    use_precomputed = anchor_mode in {"precomputed", "hybrid"} and precomputed_topology is not None
    if use_precomputed:
        available = [gene for gene in genes if gene in precomputed_topology.index]
        if available:
            topology_parts.append(precomputed_topology.reindex(available).reindex(columns=celltypes))
            source_by_gene.update({gene: "precomputed" for gene in available})

    needs_recompute = []
    if anchor_mode == "recompute" or precomputed_topology is None:
        needs_recompute = genes
    else:
        needs_recompute = [gene for gene in genes if gene not in source_by_gene]

    if needs_recompute:
        recomputed = _recompute_gene_topology(
            reference_df,
            expression_df,
            needs_recompute,
            entity_points_df=entity_points_df,
            cell_id_col=cell_id_col,
            x_col=x_col,
            y_col=y_col,
            entity_min_weight=entity_min_weight,
            topology_method=topology_method,
        )
        if not recomputed.empty:
            topology_parts.append(recomputed.reindex(columns=celltypes))
        source_by_gene.update({gene: "recompute" for gene in needs_recompute})

    if topology_parts:
        topology = pd.concat(topology_parts, axis=0)
        topology = topology[~topology.index.duplicated(keep="first")]
        topology = topology.reindex(genes).reindex(columns=celltypes)
    else:
        topology = pd.DataFrame(index=genes, columns=celltypes, dtype=float)

    if precomputed_structure is not None and anchor_mode in {"precomputed", "hybrid"}:
        structure_source = "precomputed"
        resolved_structure = precomputed_structure.reindex(index=celltypes, columns=celltypes)
    elif structure_map is not None and isinstance(structure_map, pd.DataFrame):
        structure_source = "provided"
        resolved_structure = structure_map.copy().reindex(index=celltypes, columns=celltypes)
    else:
        structure_source = "recompute"
        resolved_structure, _ = compute_cophenetic_distances_from_df(
            reference_df,
            x_col=x_col,
            y_col=y_col,
            celltype_col="celltype",
            method=topology_method,
        )
        resolved_structure = resolved_structure.reindex(index=celltypes, columns=celltypes)
    resolved_structure = resolved_structure.apply(pd.to_numeric, errors="coerce")
    resolved_structure = resolved_structure.fillna(1.0)
    for celltype in celltypes:
        if celltype in resolved_structure.index and celltype in resolved_structure.columns:
            resolved_structure.loc[celltype, celltype] = 0.0

    topology.index.name = "gene"
    topology.columns.name = "celltype"
    return topology, source_by_gene, resolved_structure, structure_source


[docs] def build_entity_points_from_expression( reference_df: pd.DataFrame, expression_df: pd.DataFrame, *, entities: Optional[Iterable[str]] = None, cell_id_col: str = "cell_id", x_col: str = "x", y_col: str = "y", min_weight: float = 0.0, entity_col: str = "entity", weight_col: str = "weight", ) -> pd.DataFrame: entities = list(entities) if entities is not None else list(expression_df.columns) records: list[pd.DataFrame] = [] aligned_expr = expression_df.reindex(reference_df[cell_id_col]).fillna(0.0) aligned_expr.index = aligned_expr.index.astype(str) for entity in entities: if entity not in aligned_expr.columns: continue weights = _coerce_nonnegative(aligned_expr[entity]) keep = weights > float(min_weight) if not keep.any(): continue entity_points = reference_df.loc[keep.to_numpy(), [cell_id_col, x_col, y_col, "celltype"]].copy() entity_points[entity_col] = str(entity) entity_points[weight_col] = weights.loc[keep].to_numpy() records.append(entity_points) if not records: return pd.DataFrame(columns=[cell_id_col, x_col, y_col, "celltype", entity_col, weight_col]) return pd.concat(records, ignore_index=True)
[docs] def compute_weighted_searcher_findee_distance_matrix_from_df( df: pd.DataFrame, x_col: str = "x", y_col: str = "y", z_col: Optional[str] = None, group_col: str = "celltype", weight_col: Optional[str] = "weight", min_weight: float = 0.0, ) -> pd.DataFrame: """ Compute a weighted directed searcher→findee average nearest-neighbor matrix. The weighting scheme is intentionally conservative to preserve backward compatibility with the original ``t_and_c`` logic: the nearest-neighbor geometry is unchanged, while the row-wise aggregation becomes a weighted average over source/searcher points. When every point has unit weight, the result is exactly equivalent to ``compute_searcher_findee_distance_matrix_from_df``. """ required = {x_col, y_col, group_col} if z_col is not None: required.add(z_col) if weight_col is not None: required.add(weight_col) missing = required.difference(df.columns) if missing: raise ValueError(f"Input DataFrame is missing required columns: {sorted(missing)}") work = df.copy() if weight_col is None: work["__weight"] = 1.0 weight_col = "__weight" work[group_col] = work[group_col].astype("category").cat.remove_unused_categories() work[weight_col] = _coerce_nonnegative(work[weight_col]) work = work.loc[work[weight_col] > float(min_weight)].copy() if work.empty: raise ValueError("No weighted points remain after filtering; cannot compute weighted distances.") coord_cols = [x_col, y_col] + ([z_col] if z_col is not None else []) coords = work[coord_cols].to_numpy(dtype=float) groups = work[group_col].astype("category").cat.remove_unused_categories() unique_groups = list(groups.cat.categories) df_nearest = pd.DataFrame(index=work.index, columns=unique_groups, dtype=float) for target in unique_groups: target_mask = (groups == target).to_numpy() coords_target = coords[target_mask] if coords_target.shape[0] == 0: df_nearest.loc[:, target] = np.nan continue nbrs = NearestNeighbors(n_neighbors=1, algorithm="auto") nbrs.fit(coords_target) distances, _ = nbrs.kneighbors(coords) df_nearest[target] = distances[:, 0] weights = work[weight_col].to_numpy(dtype=float) result_rows: list[pd.Series] = [] for group in unique_groups: source_mask = (groups == group).to_numpy() source_values = df_nearest.loc[source_mask, unique_groups] source_weights = weights[source_mask] row = { target: _weighted_average(source_values[target].to_numpy(dtype=float), source_weights) for target in unique_groups } result_rows.append(pd.Series(row, name=str(group))) return pd.DataFrame(result_rows, index=unique_groups, columns=unique_groups)
[docs] def compute_weighted_cophenetic_distances_from_df( df: pd.DataFrame, x_col: str = "x", y_col: str = "y", z_col: Optional[str] = None, group_col: str = "celltype", weight_col: Optional[str] = "weight", min_weight: float = 0.0, method: str = "average", show_corr: bool = False, ) -> tuple[pd.DataFrame, pd.DataFrame]: group_mean = compute_weighted_searcher_findee_distance_matrix_from_df( df=df, x_col=x_col, y_col=y_col, z_col=z_col, group_col=group_col, weight_col=weight_col, min_weight=min_weight, ) return compute_cophenetic_from_distance_matrix(group_mean, method=method, show_corr=show_corr)
[docs] def compute_entity_to_cell_topology( reference_df: pd.DataFrame, entity_points_df: pd.DataFrame, *, x_col: str = "x", y_col: str = "y", z_col: Optional[str] = None, celltype_col: str = "celltype", entity_col: str = "entity", weight_col: str = "weight", min_weight: float = 0.0, method: str = "average", ) -> pd.DataFrame: """ Generalize transcript-by-cell topology to arbitrary weighted entities. ``reference_df`` contains the fixed cell-type template. ``entity_points_df`` contains an entity label plus spatial points and weights. For every entity we temporarily append its weighted point cloud to the reference template, compute a weighted StructureMap, and extract the entity→celltype row. """ reference = reference_df.copy() if celltype_col not in reference.columns: raise ValueError(f"reference_df must contain {celltype_col!r}.") required_entity = {x_col, y_col, entity_col, weight_col} if z_col is not None: required_entity.add(z_col) missing_entity = required_entity.difference(entity_points_df.columns) if missing_entity: raise ValueError(f"entity_points_df is missing required columns: {sorted(missing_entity)}") reference = reference.rename(columns={celltype_col: "__group"}) reference["__weight"] = 1.0 reference_cols = [x_col, y_col] + ([z_col] if z_col is not None else []) + ["__group", "__weight"] reference = reference.loc[:, reference_cols] celltypes = reference["__group"].astype(str).tolist() unique_celltypes = list(dict.fromkeys(celltypes)) entity_points = entity_points_df.copy() entity_points[entity_col] = entity_points[entity_col].astype(str) entity_points[weight_col] = _coerce_nonnegative(entity_points[weight_col]) unique_entities = list(dict.fromkeys(entity_points[entity_col].tolist())) rows: list[pd.Series] = [] for entity in unique_entities: sub = entity_points.loc[ (entity_points[entity_col] == entity) & (entity_points[weight_col] > float(min_weight)) ].copy() if sub.empty: rows.append(pd.Series(np.nan, index=unique_celltypes, name=entity)) continue sub = sub.rename(columns={entity_col: "__group", weight_col: "__weight"}) sub["__group"] = entity sub = sub.loc[:, reference_cols] combined = pd.concat([reference, sub], ignore_index=True) row_coph, _ = compute_weighted_cophenetic_distances_from_df( combined, x_col=x_col, y_col=y_col, z_col=z_col, group_col="__group", weight_col="__weight", min_weight=min_weight, method=method, ) row = row_coph.loc[entity].reindex(unique_celltypes) rows.append(pd.Series(row, name=entity)) if not rows: return pd.DataFrame(columns=unique_celltypes) out = pd.DataFrame(rows) out.index.name = entity_col out.columns.name = celltype_col return out
[docs] def compute_entity_structuremap( entity_points_df: pd.DataFrame, *, x_col: str = "x", y_col: str = "y", z_col: Optional[str] = None, entity_col: str = "entity", weight_col: str = "weight", min_weight: float = 0.0, method: str = "average", ) -> pd.DataFrame: if entity_points_df.empty: return pd.DataFrame() work = entity_points_df.copy() work[entity_col] = work[entity_col].astype(str) work[weight_col] = _coerce_nonnegative(work[weight_col]) row_coph, _ = compute_weighted_cophenetic_distances_from_df( work, x_col=x_col, y_col=y_col, z_col=z_col, group_col=entity_col, weight_col=weight_col, min_weight=min_weight, method=method, ) row_coph.index.name = entity_col row_coph.columns.name = entity_col return row_coph
def _build_neighbor_index( reference_df: pd.DataFrame, *, x_col: str = "x", y_col: str = "y", k_neighbors: int = 8, radius: Optional[float] = None, ) -> list[np.ndarray]: coords = reference_df[[x_col, y_col]].to_numpy(dtype=float) if len(coords) == 0: return [] if radius is not None: model = NearestNeighbors(radius=radius, algorithm="auto") model.fit(coords) neighbors = model.radius_neighbors(coords, return_distance=False) return [arr[arr != idx] for idx, arr in enumerate(neighbors)] n_neighbors = min(len(coords), max(2, int(k_neighbors) + 1)) model = NearestNeighbors(n_neighbors=n_neighbors, algorithm="auto") model.fit(coords) indices = model.kneighbors(coords, return_distance=False) return [arr[arr != idx] for idx, arr in enumerate(indices)] def _smooth_matrix_by_neighbors( matrix: pd.DataFrame, neighbor_index: list[np.ndarray], *, include_self: bool = True, ) -> pd.DataFrame: if matrix.empty: return matrix.copy() values = matrix.to_numpy(dtype=float) smoothed = np.zeros_like(values) for idx, neighbors in enumerate(neighbor_index): if include_self: neighbors = np.unique(np.append(neighbors, idx)) if len(neighbors) == 0: smoothed[idx] = values[idx] else: smoothed[idx] = values[neighbors].mean(axis=0) return pd.DataFrame(smoothed, index=matrix.index, columns=matrix.columns) def _normalize_matrix_columns(frame: pd.DataFrame) -> pd.DataFrame: if frame.empty: return frame.copy() values = frame.copy().astype(float) mins = values.min(axis=0) maxs = values.max(axis=0) spans = (maxs - mins).replace(0.0, np.nan) normalized = values.sub(mins, axis=1).div(spans, axis=1).fillna(0.0) constant_nonzero = spans.isna() & (maxs > 0) for col in values.columns[constant_nonzero]: normalized[col] = 1.0 return normalized def summarize_expression_by_celltype( expression_df: pd.DataFrame, celltypes: pd.Series, *, detection_threshold: float = 0.0, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Return raw and normalized gene×celltype summaries using pseudobulk share × sqrt(detection fraction). """ expr = expression_df.copy() expr.index = expr.index.astype(str) celltypes = celltypes.astype(str) pseudobulk = expr.groupby(celltypes).sum() share = pseudobulk.div(pseudobulk.sum(axis=0).replace(0.0, np.nan), axis=1).fillna(0.0) detected = (expr > float(detection_threshold)).groupby(celltypes).mean() combined = share.mul(np.sqrt(detected)) raw = combined.T.astype(float) raw.index.name = "gene" raw.columns.name = "celltype" normalized = _normalize_frame_rows(raw) normalized.index.name = "gene" normalized.columns.name = "celltype" return raw, normalized def _normalize_lr_prior(lr_pairs: pd.DataFrame, prior_col: Optional[str]) -> pd.Series: if prior_col is None or prior_col not in lr_pairs.columns: return pd.Series(np.ones(len(lr_pairs)), index=lr_pairs.index, name="prior_confidence") prior = pd.to_numeric(lr_pairs[prior_col], errors="coerce").fillna(0.0).astype(float) return _normalize_series(prior).rename("prior_confidence") def _compute_local_contact_matrix( reference_df: pd.DataFrame, ligand_values: pd.Series, receptor_values: pd.Series, neighbor_index: list[np.ndarray], *, celltype_col: str = "celltype", min_cross_edges: int = 50, contact_expr_threshold: str | float = "q75_nonzero", winsor_quantile: float = 0.99, ) -> dict[str, pd.DataFrame]: celltypes = reference_df[celltype_col].astype(str).tolist() unique_celltypes = list(dict.fromkeys(celltypes)) index_by_type = {celltype: [] for celltype in unique_celltypes} for idx, celltype in enumerate(celltypes): index_by_type[celltype].append(idx) ligand_raw = pd.to_numeric(ligand_values.reindex(reference_df.index).fillna(0.0), errors="coerce").fillna(0.0).astype(float) receptor_raw = pd.to_numeric(receptor_values.reindex(reference_df.index).fillna(0.0), errors="coerce").fillna(0.0).astype(float) ligand_norm = _winsorized_normalize_series(ligand_raw, upper_quantile=winsor_quantile) receptor_norm = _winsorized_normalize_series(receptor_raw, upper_quantile=winsor_quantile) def _resolve_threshold(series: pd.Series) -> float: if isinstance(contact_expr_threshold, (int, float)): return float(contact_expr_threshold) if contact_expr_threshold == "q75_nonzero": positive = series.loc[series > 0] return float(positive.quantile(0.75)) if not positive.empty else float("inf") raise ValueError("contact_expr_threshold must be a float or 'q75_nonzero'") ligand_threshold = _resolve_threshold(ligand_raw) receptor_threshold = _resolve_threshold(receptor_raw) strength = pd.DataFrame(0.0, index=unique_celltypes, columns=unique_celltypes, dtype=float) coverage = pd.DataFrame(0.0, index=unique_celltypes, columns=unique_celltypes, dtype=float) cross_edges = pd.DataFrame(0, index=unique_celltypes, columns=unique_celltypes, dtype=int) for sender in unique_celltypes: sender_indices = index_by_type[sender] for receiver in unique_celltypes: edge_scores: list[float] = [] active_edges = 0 total_edges = 0 for idx in sender_indices: for nbr in neighbor_index[idx]: if celltypes[nbr] != receiver: continue total_edges += 1 edge_scores.append(float(ligand_norm.iloc[idx] * receptor_norm.iloc[nbr])) if ligand_raw.iloc[idx] > ligand_threshold and receptor_raw.iloc[nbr] > receptor_threshold: active_edges += 1 cross_edges.loc[sender, receiver] = total_edges strength.loc[sender, receiver] = float(np.mean(edge_scores)) if edge_scores else 0.0 coverage.loc[sender, receiver] = (active_edges / total_edges) if total_edges else 0.0 strength_norm = _winsorized_normalize_frame(strength, upper_quantile=winsor_quantile) off_diag_mask = ~np.eye(len(unique_celltypes), dtype=bool) off_diag_values = cross_edges.to_numpy(dtype=float)[off_diag_mask] off_diag_values = off_diag_values[off_diag_values > 0] if off_diag_values.size == 0: edge_scale = 1.0 else: edge_scale = float(np.quantile(off_diag_values, 0.99)) if math.isclose(edge_scale, 0.0): edge_scale = 1.0 edge_support = (cross_edges.astype(float) / edge_scale).clip(lower=0.0, upper=1.0) local_contact = pd.DataFrame(0.0, index=unique_celltypes, columns=unique_celltypes, dtype=float) enough_edges = cross_edges >= int(min_cross_edges) local_contact = (np.sqrt(strength_norm.mul(coverage)) * edge_support).where(enough_edges, other=0.0) return { "local_contact": local_contact.astype(float), "contact_strength_raw": strength.astype(float), "contact_strength_normalized": strength_norm.astype(float), "contact_coverage": coverage.astype(float), "cross_edge_count": cross_edges.astype(int), "edge_support": edge_support.astype(float), "ligand_threshold": pd.DataFrame(ligand_threshold, index=unique_celltypes, columns=unique_celltypes), "receptor_threshold": pd.DataFrame(receptor_threshold, index=unique_celltypes, columns=unique_celltypes), } def _geometric_mean(values: Iterable[float], eps: float = 1e-8) -> float: arr = np.asarray(list(values), dtype=float) arr = np.clip(arr, 0.0, None) return float(np.exp(np.mean(np.log(arr + eps)))) def _prepare_hotspot_table( reference_df: pd.DataFrame, *, sender_mask: pd.Series, receiver_mask: pd.Series, sender_score: pd.Series, receiver_score: pd.Series, ligand: str, receptor: str, sender_celltype: str, receiver_celltype: str, x_col: str, y_col: str, ) -> pd.DataFrame: sender_tbl = reference_df.loc[sender_mask, ["cell_id", x_col, y_col, "celltype"]].copy() sender_tbl["role"] = "sender" sender_tbl["feature"] = ligand sender_tbl["score"] = sender_score.loc[sender_mask].to_numpy() sender_tbl["sender_celltype"] = sender_celltype sender_tbl["receiver_celltype"] = receiver_celltype sender_tbl["ligand"] = ligand sender_tbl["receptor"] = receptor receiver_tbl = reference_df.loc[receiver_mask, ["cell_id", x_col, y_col, "celltype"]].copy() receiver_tbl["role"] = "receiver" receiver_tbl["feature"] = receptor receiver_tbl["score"] = receiver_score.loc[receiver_mask].to_numpy() receiver_tbl["sender_celltype"] = sender_celltype receiver_tbl["receiver_celltype"] = receiver_celltype receiver_tbl["ligand"] = ligand receiver_tbl["receptor"] = receptor return pd.concat([sender_tbl, receiver_tbl], ignore_index=True)
[docs] def ligand_receptor_topology_analysis( *, reference_df: Optional[pd.DataFrame] = None, expression_df: Optional[pd.DataFrame] = None, lr_pairs: pd.DataFrame, output_dir: Optional[str | os.PathLike[str]] = None, adata: Any = None, entity_points_df: Optional[pd.DataFrame] = None, tbc_results: Optional[str | os.PathLike[str]] = None, t_and_c_df: Optional[pd.DataFrame] = None, cluster_col: str = "Cluster", cell_id_col: str = "cell_id", x_col: str = "x", y_col: str = "y", celltype_col: str = "celltype", ligand_col: str = "ligand", receptor_col: str = "receptor", prior_col: str = "evidence_weight", structure_map: Optional[pd.DataFrame] = None, structure_map_df: Optional[pd.DataFrame] = None, anchor_mode: str = "precomputed", expression_support_mode: str = "pseudobulk_detection", contact_mode: str = "strength_coverage", entity_min_weight: float = 0.0, detection_threshold: float = 0.0, k_neighbors: int = 8, radius: Optional[float] = None, topology_method: str = "average", top_n_pairs: int = 12, hotspot_quantile: float = 0.9, min_cross_edges: int = 50, contact_expr_threshold: str | float = "q75_nonzero", use_raw: bool = False, ) -> dict[str, Any]: if expression_support_mode != "pseudobulk_detection": raise ValueError("expression_support_mode currently supports only 'pseudobulk_detection'.") if contact_mode != "strength_coverage": raise ValueError("contact_mode currently supports only 'strength_coverage'.") reference = _coerce_reference_df( reference_df, adata=adata, cluster_col=cluster_col, cell_id_col=cell_id_col, x_col=x_col, y_col=y_col, celltype_col=celltype_col, ) reference.index = reference[cell_id_col].astype(str) if ligand_col not in lr_pairs.columns or receptor_col not in lr_pairs.columns: raise ValueError(f"lr_pairs must contain {ligand_col!r} and {receptor_col!r}.") lr_pairs = lr_pairs.copy() lr_pairs[ligand_col] = lr_pairs[ligand_col].astype(str) lr_pairs[receptor_col] = lr_pairs[receptor_col].astype(str) lr_pairs["prior_confidence"] = _normalize_lr_prior(lr_pairs, prior_col) genes = list(dict.fromkeys(lr_pairs[ligand_col].tolist() + lr_pairs[receptor_col].tolist())) expression = _coerce_expression_df( reference, expression_df, adata=adata, genes=genes, cell_id_col=cell_id_col, use_raw=use_raw, ) expression.index = reference.index topology, anchor_sources, resolved_structure_map, structure_map_source = _resolve_gene_topology_anchors( reference, expression, genes, tbc_results=tbc_results, t_and_c_df=t_and_c_df, structure_map=structure_map, structure_map_df=structure_map_df, anchor_mode=anchor_mode, entity_points_df=entity_points_df, cell_id_col=cell_id_col, x_col=x_col, y_col=y_col, entity_min_weight=entity_min_weight, topology_method=topology_method, ) ligand_to_cell = topology.reindex(lr_pairs[ligand_col].drop_duplicates()) receptor_to_cell = topology.reindex(lr_pairs[receptor_col].drop_duplicates()) structure_map = resolved_structure_map _, expression_summary = summarize_expression_by_celltype( expression, reference["celltype"], detection_threshold=detection_threshold, ) neighbor_index = _build_neighbor_index(reference, x_col=x_col, y_col=y_col, k_neighbors=k_neighbors, radius=radius) celltypes = list(dict.fromkeys(reference["celltype"].astype(str).tolist())) score_rows: list[dict[str, Any]] = [] for row in lr_pairs.itertuples(index=False): ligand = getattr(row, ligand_col) receptor = getattr(row, receptor_col) prior = float(getattr(row, "prior_confidence")) if ligand not in topology.index or receptor not in topology.index: continue sender_anchor = (1.0 - topology.loc[ligand].reindex(celltypes).fillna(1.0)).clip(lower=0.0, upper=1.0) receiver_anchor = (1.0 - topology.loc[receptor].reindex(celltypes).fillna(1.0)).clip(lower=0.0, upper=1.0) sender_expr = ( expression_summary.loc[ligand].reindex(celltypes).fillna(0.0) if ligand in expression_summary.index else pd.Series(0.0, index=celltypes) ) receiver_expr = ( expression_summary.loc[receptor].reindex(celltypes).fillna(0.0) if receptor in expression_summary.index else pd.Series(0.0, index=celltypes) ) local_contact_parts = _compute_local_contact_matrix( reference, expression[ligand] if ligand in expression.columns else pd.Series(0.0, index=reference.index), expression[receptor] if receptor in expression.columns else pd.Series(0.0, index=reference.index), neighbor_index, celltype_col="celltype", min_cross_edges=min_cross_edges, contact_expr_threshold=contact_expr_threshold, ) local_contact = local_contact_parts["local_contact"].reindex(index=celltypes, columns=celltypes, fill_value=0.0) contact_strength_raw = local_contact_parts["contact_strength_raw"].reindex( index=celltypes, columns=celltypes, fill_value=0.0, ) contact_coverage = local_contact_parts["contact_coverage"].reindex( index=celltypes, columns=celltypes, fill_value=0.0, ) contact_strength_normalized = local_contact_parts["contact_strength_normalized"].reindex( index=celltypes, columns=celltypes, fill_value=0.0, ) cross_edge_count = local_contact_parts["cross_edge_count"].reindex( index=celltypes, columns=celltypes, fill_value=0, ) for sender in celltypes: for receiver in celltypes: bridge = 1.0 - float(structure_map.loc[sender, receiver]) score = _geometric_mean( [ float(sender_anchor.loc[sender]), float(receiver_anchor.loc[receiver]), bridge, float(sender_expr.loc[sender]), float(receiver_expr.loc[receiver]), float(local_contact.loc[sender, receiver]), ] ) * prior score_rows.append( { "ligand": ligand, "receptor": receptor, "sender_celltype": sender, "receiver_celltype": receiver, "anchor_source_ligand": anchor_sources.get(ligand, "recompute"), "anchor_source_receptor": anchor_sources.get(receptor, "recompute"), "structure_map_source": structure_map_source, "sender_anchor": float(sender_anchor.loc[sender]), "receiver_anchor": float(receiver_anchor.loc[receiver]), "structure_bridge": bridge, "sender_expr": float(sender_expr.loc[sender]), "receiver_expr": float(receiver_expr.loc[receiver]), "local_contact": float(local_contact.loc[sender, receiver]), "contact_strength_raw": float(contact_strength_raw.loc[sender, receiver]), "contact_strength_normalized": float(contact_strength_normalized.loc[sender, receiver]), "contact_coverage": float(contact_coverage.loc[sender, receiver]), "cross_edge_count": int(cross_edge_count.loc[sender, receiver]), "prior_confidence": prior, "LR_score": float(score), } ) scores = pd.DataFrame(score_rows) if not scores.empty: scores = scores.sort_values("LR_score", ascending=False).reset_index(drop=True) component_diagnostics = scores.copy() out_dir = _ensure_output_dir(output_dir) output_files: dict[str, Any] = {} if out_dir is not None: ligand_path = out_dir / "ligand_to_cell.csv" receptor_path = out_dir / "receptor_to_cell.csv" scores_path = out_dir / "lr_sender_receiver_scores.csv" diagnostics_path = out_dir / "lr_component_diagnostics.csv" ligand_to_cell.to_csv(ligand_path) receptor_to_cell.to_csv(receptor_path) scores.to_csv(scores_path, index=False) component_diagnostics.to_csv(diagnostics_path, index=False) output_files["ligand_to_cell"] = str(ligand_path) output_files["receptor_to_cell"] = str(receptor_path) output_files["lr_sender_receiver_scores"] = str(scores_path) output_files["lr_component_diagnostics"] = str(diagnostics_path) summary = scores.copy() summary["lr_pair"] = summary["ligand"] + "→" + summary["receptor"] summary["sender_receiver"] = summary["sender_celltype"] + "→" + summary["receiver_celltype"] top_pairs = summary.groupby("lr_pair")["LR_score"].max().sort_values(ascending=False).head(int(top_n_pairs)).index.tolist() summary_matrix = summary.loc[summary["lr_pair"].isin(top_pairs)].pivot_table( index="lr_pair", columns="sender_receiver", values="LR_score", aggfunc="max", fill_value=0.0, ) if not summary_matrix.empty: output_files["lr_summary_heatmap"] = _save_heatmap( summary_matrix, title="Ligand-receptor topology summary", output_prefix=out_dir / "lr_summary_heatmap", cmap="rocket_r", ) if not scores.empty: best = scores.iloc[0] ligand = str(best["ligand"]) receptor = str(best["receptor"]) sender = str(best["sender_celltype"]) receiver = str(best["receiver_celltype"]) ligand_cell = ( _winsorized_normalize_series(expression[ligand]) if ligand in expression.columns else pd.Series(0.0, index=reference.index) ) receptor_cell = ( _winsorized_normalize_series(expression[receptor]) if receptor in expression.columns else pd.Series(0.0, index=reference.index) ) sender_mask = (reference["celltype"] == sender) & (ligand_cell >= ligand_cell.quantile(float(hotspot_quantile))) receiver_mask = (reference["celltype"] == receiver) & (receptor_cell >= receptor_cell.quantile(float(hotspot_quantile))) hotspot_df = _prepare_hotspot_table( reference, sender_mask=sender_mask, receiver_mask=receiver_mask, sender_score=ligand_cell, receiver_score=receptor_cell, ligand=ligand, receptor=receptor, sender_celltype=sender, receiver_celltype=receiver, x_col=x_col, y_col=y_col, ) hotspot_csv = out_dir / "lr_hotspot_cells.csv" hotspot_df.to_csv(hotspot_csv, index=False) output_files["lr_hotspot_cells_csv"] = str(hotspot_csv) parquet_path = out_dir / "lr_hotspot_cells.parquet" if _safe_to_parquet(hotspot_df, parquet_path): output_files["lr_hotspot_cells_parquet"] = str(parquet_path) output_files["lr_hotspot_overlay"] = _save_hotspot_overlay( reference, x_col=x_col, y_col=y_col, sender_mask=sender_mask, receiver_mask=receiver_mask, sender_score=ligand_cell, receiver_score=receptor_cell, title=f"{ligand}→{receptor} hotspot ({sender}→{receiver})", output_prefix=out_dir / "lr_hotspot_overlay", ) return { "ligand_to_cell": ligand_to_cell, "receptor_to_cell": receptor_to_cell, "structure_map": structure_map, "scores": scores, "component_diagnostics": component_diagnostics, "anchor_sources": anchor_sources, "files": output_files, }
def _standardize_pathway_definitions(pathway_definitions: Mapping[str, Any] | pd.DataFrame) -> pd.DataFrame: if isinstance(pathway_definitions, pd.DataFrame): required = {"pathway", "gene"} missing = required.difference(pathway_definitions.columns) if missing: raise ValueError(f"pathway_definitions DataFrame is missing required columns: {sorted(missing)}") out = pathway_definitions.copy() if "weight" not in out.columns: out["weight"] = 1.0 out["pathway"] = out["pathway"].astype(str) out["gene"] = out["gene"].astype(str) out["weight"] = pd.to_numeric(out["weight"], errors="coerce").fillna(1.0).astype(float) return out[["pathway", "gene", "weight"]] rows: list[dict[str, Any]] = [] for pathway, genes in pathway_definitions.items(): if isinstance(genes, Mapping): for gene, weight in genes.items(): rows.append({"pathway": str(pathway), "gene": str(gene), "weight": float(weight)}) else: for gene in genes: rows.append({"pathway": str(pathway), "gene": str(gene), "weight": 1.0}) return pd.DataFrame(rows, columns=["pathway", "gene", "weight"])
[docs] def compute_pathway_activity_matrix( expression_df: pd.DataFrame, pathway_definitions: Mapping[str, Any] | pd.DataFrame, *, method: str = "rank_mean", normalize: bool = True, ) -> pd.DataFrame: pathway_table = _standardize_pathway_definitions(pathway_definitions) expr = expression_df.apply(pd.to_numeric, errors="coerce").fillna(0.0).astype(float) activity = pd.DataFrame(index=expr.index) if method not in {"rank_mean", "ucell", "aucell", "weighted_sum", "progeny"}: raise ValueError("method must be one of: rank_mean, ucell, aucell, weighted_sum, progeny") if method in {"rank_mean", "ucell", "aucell"}: ranked = expr.rank(axis=1, method="average", ascending=True, pct=True) for pathway, group in pathway_table.groupby("pathway", sort=False): present = [gene for gene in group["gene"] if gene in ranked.columns] if not present: activity[pathway] = 0.0 continue weights = group.set_index("gene").loc[present, "weight"].astype(float) values = ranked[present].to_numpy(dtype=float) activity[pathway] = np.average(values, axis=1, weights=np.abs(weights.to_numpy(dtype=float))) else: for pathway, group in pathway_table.groupby("pathway", sort=False): present = [gene for gene in group["gene"] if gene in expr.columns] if not present: activity[pathway] = 0.0 continue weights = group.set_index("gene").loc[present, "weight"].astype(float) denom = float(np.sum(np.abs(weights.to_numpy(dtype=float)))) or 1.0 activity[pathway] = expr[present].to_numpy(dtype=float) @ weights.to_numpy(dtype=float) / denom if normalize: activity = _normalize_matrix_columns(activity) return activity
def _aggregate_pathway_gene_topology( gene_topology: pd.DataFrame, pathway_table: pd.DataFrame, *, aggregate: str = "weighted_median", ) -> pd.DataFrame: celltypes = gene_topology.columns.astype(str).tolist() rows: list[pd.Series] = [] for pathway, group in pathway_table.groupby("pathway", sort=False): pathway_genes = [gene for gene in group["gene"].astype(str) if gene in gene_topology.index] weights = group.set_index("gene").reindex(pathway_genes)["weight"].fillna(1.0).astype(float) if not pathway_genes: rows.append(pd.Series(np.nan, index=celltypes, name=str(pathway))) continue aggregated = { celltype: _aggregate_weighted_values( gene_topology.loc[pathway_genes, celltype].to_numpy(dtype=float), weights.to_numpy(dtype=float), method=aggregate, ) for celltype in celltypes } rows.append(pd.Series(aggregated, name=str(pathway))) out = pd.DataFrame(rows, columns=celltypes) out.index.name = "pathway" out.columns.name = "celltype" return out def _build_pathway_activity_points( reference_df: pd.DataFrame, pathway_activity: pd.DataFrame, *, cell_id_col: str, x_col: str, y_col: str, activity_mode: str, activity_threshold_schedule: Sequence[float], min_activity_cells: int, ) -> tuple[pd.DataFrame, pd.DataFrame]: records: list[pd.DataFrame] = [] diagnostics: list[dict[str, Any]] = [] for pathway in pathway_activity.columns: values = pd.to_numeric(pathway_activity[pathway], errors="coerce").fillna(0.0).astype(float) retained_quantile: Optional[float] = None retained_mask = pd.Series(False, index=values.index) positive_values = values.loc[values > 0] quantile_pool = positive_values if not positive_values.empty else values for q in activity_threshold_schedule: threshold = float(quantile_pool.quantile(float(q))) if positive_values.empty: mask = values >= threshold else: mask = (values >= threshold) & (values > 0) if int(mask.sum()) >= int(min_activity_cells): retained_mask = mask retained_quantile = float(q) break if retained_quantile is None: top_n = min(int(min_activity_cells), len(values)) if top_n > 0: top_idx = values.nlargest(top_n).index retained_mask.loc[top_idx] = True retained_quantile = float(activity_threshold_schedule[-1]) if activity_threshold_schedule else 0.5 retained_values = values.loc[retained_mask] if not retained_values.empty: points = reference_df.loc[retained_mask, [cell_id_col, x_col, y_col, "celltype"]].copy() points["entity"] = str(pathway) points["weight"] = retained_values.to_numpy(dtype=float) records.append(points) diagnostics.append( { "pathway": str(pathway), "retained_cell_count": int(retained_mask.sum()), "retained_quantile": float(retained_quantile), "activity_mode": str(activity_mode), } ) if records: entity_points = pd.concat(records, ignore_index=True) else: entity_points = pd.DataFrame(columns=[cell_id_col, x_col, y_col, "celltype", "entity", "weight"]) diagnostics_df = pd.DataFrame(diagnostics) return entity_points, diagnostics_df def _pathway_mode_summary( pathway_to_cell: pd.DataFrame, *, mode_name: str, ) -> pd.DataFrame: if pathway_to_cell.empty: return pd.DataFrame(columns=["pathway", f"{mode_name}_best_celltype", f"{mode_name}_best_distance"]) best = pd.DataFrame( { "pathway": pathway_to_cell.index.astype(str), f"{mode_name}_best_celltype": pathway_to_cell.idxmin(axis=1).astype(str).to_numpy(), f"{mode_name}_best_distance": pathway_to_cell.min(axis=1).to_numpy(dtype=float), } ) return best
[docs] def pathway_topology_analysis( *, pathway_definitions: Mapping[str, Any] | pd.DataFrame, reference_df: Optional[pd.DataFrame] = None, expression_df: Optional[pd.DataFrame] = None, output_dir: Optional[str | os.PathLike[str]] = None, adata: Any = None, tbc_results: Optional[str | os.PathLike[str]] = None, t_and_c_df: Optional[pd.DataFrame] = None, cluster_col: str = "Cluster", cell_id_col: str = "cell_id", x_col: str = "x", y_col: str = "y", celltype_col: str = "celltype", scoring_method: str = "weighted_sum", view: str = "intrinsic", structure_map: Optional[pd.DataFrame] = None, structure_map_df: Optional[pd.DataFrame] = None, anchor_mode: str = "precomputed", pathway_modes: Sequence[str] = ("gene_topology_aggregate", "activity_point_cloud"), primary_pathway_mode: str = "gene_topology_aggregate", pathway_aggregate: str = "weighted_median", activity_threshold_schedule: Sequence[float] = (0.95, 0.90, 0.80, 0.70, 0.60, 0.50), min_activity_cells: int = 50, entity_min_weight: float = 0.0, k_neighbors: int = 8, radius: Optional[float] = None, topology_method: str = "average", hotspot_quantile: float = 0.9, use_raw: bool = False, ) -> dict[str, Any]: if view not in {"intrinsic", "niche_smoothed"}: raise ValueError("view must be either 'intrinsic' or 'niche_smoothed'.") valid_modes = {"gene_topology_aggregate", "activity_point_cloud"} if any(mode not in valid_modes for mode in pathway_modes): raise ValueError("pathway_modes must contain only 'gene_topology_aggregate' and/or 'activity_point_cloud'.") if primary_pathway_mode not in valid_modes: raise ValueError("primary_pathway_mode must be 'gene_topology_aggregate' or 'activity_point_cloud'.") if primary_pathway_mode not in set(pathway_modes): raise ValueError("primary_pathway_mode must also be present in pathway_modes.") pathway_table = _standardize_pathway_definitions(pathway_definitions) genes = pathway_table["gene"].drop_duplicates().tolist() reference = _coerce_reference_df( reference_df, adata=adata, cluster_col=cluster_col, cell_id_col=cell_id_col, x_col=x_col, y_col=y_col, celltype_col=celltype_col, ) reference.index = reference[cell_id_col].astype(str) expression = _coerce_expression_df( reference, expression_df, adata=adata, genes=genes, cell_id_col=cell_id_col, use_raw=use_raw, ) expression.index = reference.index pathway_activity = compute_pathway_activity_matrix(expression, pathway_table, method=scoring_method, normalize=False) pathway_activity = _robust_scale_columns(pathway_activity) if view == "niche_smoothed": neighbor_index = _build_neighbor_index(reference, x_col=x_col, y_col=y_col, k_neighbors=k_neighbors, radius=radius) pathway_activity = _robust_scale_columns(_smooth_matrix_by_neighbors(pathway_activity, neighbor_index, include_self=True)) gene_topology, anchor_sources, resolved_structure_map, structure_map_source = _resolve_gene_topology_anchors( reference, expression, genes, tbc_results=tbc_results, t_and_c_df=t_and_c_df, structure_map=structure_map, structure_map_df=structure_map_df, anchor_mode=anchor_mode, cell_id_col=cell_id_col, x_col=x_col, y_col=y_col, entity_min_weight=entity_min_weight, topology_method=topology_method, ) gene_topology_aggregate = _aggregate_pathway_gene_topology( gene_topology, pathway_table, aggregate=pathway_aggregate, ) gene_topology_structuremap = _safe_row_cophenetic(gene_topology_aggregate, method=topology_method) activity_entity_points, activity_diagnostics = _build_pathway_activity_points( reference, pathway_activity, cell_id_col=cell_id_col, x_col=x_col, y_col=y_col, activity_mode=view, activity_threshold_schedule=activity_threshold_schedule, min_activity_cells=min_activity_cells, ) pathway_activity_to_cell = compute_entity_to_cell_topology( reference, activity_entity_points, x_col=x_col, y_col=y_col, celltype_col="celltype", entity_col="entity", weight_col="weight", min_weight=entity_min_weight, method=topology_method, ) pathway_activity_structuremap = compute_entity_structuremap( activity_entity_points, x_col=x_col, y_col=y_col, entity_col="entity", weight_col="weight", min_weight=entity_min_weight, method=topology_method, ) primary_lookup = { "gene_topology_aggregate": (gene_topology_aggregate, gene_topology_structuremap), "activity_point_cloud": (pathway_activity_to_cell, pathway_activity_structuremap), } pathway_to_cell, pathway_structuremap = primary_lookup[primary_pathway_mode] mode_comparison = ( _pathway_mode_summary(gene_topology_aggregate, mode_name="aggregate") .merge( _pathway_mode_summary(pathway_activity_to_cell, mode_name="activity"), on="pathway", how="outer", ) .merge(activity_diagnostics, on="pathway", how="left") ) mode_comparison["primary_pathway_mode"] = primary_pathway_mode mode_comparison["structure_map_source"] = structure_map_source out_dir = _ensure_output_dir(output_dir) output_files: dict[str, Any] = {} if out_dir is not None: p_to_c_path = out_dir / "pathway_to_cell.csv" p_to_p_path = out_dir / "pathway_structuremap.csv" p_activity_to_c_path = out_dir / "pathway_activity_to_cell.csv" p_activity_to_p_path = out_dir / "pathway_activity_structuremap.csv" comparison_path = out_dir / "pathway_mode_comparison.csv" pathway_to_cell.to_csv(p_to_c_path) pathway_structuremap.to_csv(p_to_p_path) pathway_activity_to_cell.to_csv(p_activity_to_c_path) pathway_activity_structuremap.to_csv(p_activity_to_p_path) mode_comparison.to_csv(comparison_path, index=False) output_files["pathway_to_cell"] = str(p_to_c_path) output_files["pathway_structuremap"] = str(p_to_p_path) output_files["pathway_activity_to_cell"] = str(p_activity_to_c_path) output_files["pathway_activity_structuremap"] = str(p_activity_to_p_path) output_files["pathway_mode_comparison"] = str(comparison_path) if not pathway_to_cell.empty: output_files["pathway_to_cell_heatmap"] = _save_heatmap( pathway_to_cell, title=f"Pathway-to-cell topology ({primary_pathway_mode})", output_prefix=out_dir / "pathway_to_cell_heatmap", cmap="viridis_r", ) if (not pathway_activity.empty) and (not pathway_activity_to_cell.empty): best_pathway = pathway_activity_to_cell.min(axis=1).sort_values().index[0] best_score = pathway_activity[best_pathway] threshold = float(best_score.quantile(float(hotspot_quantile))) hotspot_mask = best_score >= threshold hotspot_df = reference.loc[hotspot_mask, ["cell_id", x_col, y_col, "celltype"]].copy() hotspot_df["pathway"] = best_pathway hotspot_df["score"] = best_score.loc[hotspot_mask].to_numpy() activity_meta = activity_diagnostics.set_index("pathway").reindex([best_pathway]) if not activity_meta.empty: hotspot_df["retained_quantile"] = float(activity_meta["retained_quantile"].iloc[0]) hotspot_df["retained_cell_count"] = int(activity_meta["retained_cell_count"].iloc[0]) hotspot_df["activity_mode"] = str(activity_meta["activity_mode"].iloc[0]) hotspot_csv = out_dir / "pathway_hotspot_cells.csv" hotspot_df.to_csv(hotspot_csv, index=False) output_files["pathway_hotspot_cells_csv"] = str(hotspot_csv) parquet_path = out_dir / "pathway_hotspot_cells.parquet" if _safe_to_parquet(hotspot_df, parquet_path): output_files["pathway_hotspot_cells_parquet"] = str(parquet_path) output_files["pathway_roi_overlay"] = _save_hotspot_overlay( reference, x_col=x_col, y_col=y_col, sender_mask=hotspot_mask, receiver_mask=pd.Series(False, index=reference.index), sender_score=_winsorized_normalize_series(best_score), receiver_score=pd.Series(0.0, index=reference.index), title=f"{best_pathway} hotspot ({view}, activity-point-cloud)", output_prefix=out_dir / "pathway_roi_overlay", ) return { "pathway_activity": pathway_activity, "gene_topology": gene_topology, "gene_topology_aggregate": gene_topology_aggregate, "gene_topology_structuremap": gene_topology_structuremap, "pathway_activity_to_cell": pathway_activity_to_cell, "pathway_activity_structuremap": pathway_activity_structuremap, "pathway_to_cell": pathway_to_cell, "pathway_structuremap": pathway_structuremap, "pathway_mode_comparison": mode_comparison, "activity_diagnostics": activity_diagnostics, "anchor_sources": anchor_sources, "structure_map": resolved_structure_map, "files": output_files, }
[docs] def ligand_receptor_target_consistency( lr_scores: pd.DataFrame, receiver_signatures: Mapping[str, Any] | pd.DataFrame, ligand_target_prior: pd.DataFrame, *, ligand_col: str = "ligand", receiver_col: str = "receiver_celltype", target_col: str = "target", prior_weight_col: str = "weight", signature_gene_col: str = "gene", signature_weight_col: str = "score", ) -> pd.DataFrame: """ Compute a NicheNet-like downstream target consistency layer. The default scoring is intentionally lightweight: for each ligand and receiver cell type we compute the weighted overlap between the ligand prior targets and the receiver signature genes. The output can be merged back onto the ``ligand_receptor_topology_analysis`` result table. """ if not {ligand_col, receiver_col, "LR_score"}.issubset(lr_scores.columns): raise ValueError("lr_scores must contain ligand, receiver_celltype, and LR_score columns.") required_prior = {ligand_col, target_col} missing_prior = required_prior.difference(ligand_target_prior.columns) if missing_prior: raise ValueError(f"ligand_target_prior is missing required columns: {sorted(missing_prior)}") prior = ligand_target_prior.copy() if prior_weight_col not in prior.columns: prior[prior_weight_col] = 1.0 prior[ligand_col] = prior[ligand_col].astype(str) prior[target_col] = prior[target_col].astype(str) prior[prior_weight_col] = pd.to_numeric(prior[prior_weight_col], errors="coerce").fillna(1.0).astype(float) if isinstance(receiver_signatures, pd.DataFrame): required_sig = {receiver_col, signature_gene_col} missing_sig = required_sig.difference(receiver_signatures.columns) if missing_sig: raise ValueError(f"receiver_signatures is missing required columns: {sorted(missing_sig)}") sig_df = receiver_signatures.copy() if signature_weight_col not in sig_df.columns: sig_df[signature_weight_col] = 1.0 sig_df[receiver_col] = sig_df[receiver_col].astype(str) sig_df[signature_gene_col] = sig_df[signature_gene_col].astype(str) sig_df[signature_weight_col] = pd.to_numeric(sig_df[signature_weight_col], errors="coerce").fillna(1.0).astype(float) else: rows: list[dict[str, Any]] = [] for receiver, signature in receiver_signatures.items(): if isinstance(signature, Mapping): for gene, score in signature.items(): rows.append({receiver_col: str(receiver), signature_gene_col: str(gene), signature_weight_col: float(score)}) else: for gene in signature: rows.append({receiver_col: str(receiver), signature_gene_col: str(gene), signature_weight_col: 1.0}) sig_df = pd.DataFrame(rows, columns=[receiver_col, signature_gene_col, signature_weight_col]) signature_lookup = { receiver: group.set_index(signature_gene_col)[signature_weight_col].astype(float) for receiver, group in sig_df.groupby(receiver_col, sort=False) } prior_lookup = { ligand: group.groupby(target_col)[prior_weight_col].sum().astype(float) for ligand, group in prior.groupby(ligand_col, sort=False) } out = lr_scores.copy() target_support: list[float] = [] for row in out.itertuples(index=False): ligand = getattr(row, ligand_col) receiver = getattr(row, receiver_col) if ligand not in prior_lookup or receiver not in signature_lookup: target_support.append(0.0) continue target_weights = prior_lookup[ligand] signature_weights = signature_lookup[receiver] overlap = target_weights.index.intersection(signature_weights.index) if len(overlap) == 0: target_support.append(0.0) continue numerator = float((target_weights.loc[overlap] * signature_weights.loc[overlap]).sum()) denominator = float(target_weights.sum()) or 1.0 target_support.append(numerator / denominator) out["target_support"] = _normalize_series(pd.Series(target_support, index=out.index)) out["topology_supported"] = out["LR_score"].astype(float) out["target_supported"] = out["target_support"].astype(float) out["topology_and_target_supported"] = np.sqrt(out["LR_score"].astype(float) * out["target_support"].astype(float)) return out