Source code for sfplot.analysis.searcher_findee_score

# Cell-GPS compute_cophenetic_distances_from_adata.py

import os
import numpy as np
import pandas as pd
from typing import Optional, Tuple
from scipy.cluster.hierarchy import linkage, cophenet
from scipy.spatial.distance import squareform, pdist
from sklearn.neighbors import NearestNeighbors


[docs] def compute_cophenetic_distances_from_adata( adata: 'anndata.AnnData', cluster_col: str = "Cluster", output_dir: Optional[str] = None, method: str = "average" ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Compute and return cophenetic distance matrices in both row and column dimensions (using cophenet), then apply linear normalization to [0,1] for each separately. Unlike the previous version, min and max values are computed independently for rows and columns. """ # 0. Optional: handle output directory if output_dir is None: output_dir = os.getcwd() os.makedirs(output_dir, exist_ok=True) # 1. Extract cluster information if cluster_col not in adata.obs.columns: raise ValueError(f"'{cluster_col}' does not exist in adata.obs. Please check the column name.") clusters = adata.obs[cluster_col].astype("category") unique_clusters = clusters.cat.categories # 2. Extract spatial coordinates if "spatial" not in adata.obsm: raise ValueError("'spatial' coordinate info does not exist in adata.obsm. Please ensure the data contains spatial coordinates.") coords = adata.obsm["spatial"] # typically (n_cells, 2) or (n_cells, 3) # 3. Build a DataFrame with rows as cell_id and columns as cluster if "cell_id" not in adata.obs.columns: raise ValueError("'cell_id' does not exist in adata.obs. Please ensure the data contains this column.") df_nearest_cluster_dist = pd.DataFrame( index=adata.obs["cell_id"], columns=unique_clusters, dtype=float ) # 4. For each cluster, use a nearest-neighbor model to compute distances from all cells to that cluster for c in unique_clusters: mask_c = (clusters == c) coords_c = coords[mask_c] if coords_c.shape[0] == 0: df_nearest_cluster_dist.loc[:, c] = np.nan continue nbrs_c = NearestNeighbors(n_neighbors=1, algorithm="auto") nbrs_c.fit(coords_c) dist_c, _ = nbrs_c.kneighbors(coords) df_nearest_cluster_dist[c] = dist_c[:, 0] # (optional) save results to adata.uns adata.uns["nearest_cluster_dist"] = df_nearest_cluster_dist # 5. Compute mean distance per cluster => cluster x cluster matrix clusters_by_id = pd.Series( data=clusters.values, index=adata.obs["cell_id"], name=cluster_col ) df_group_mean = df_nearest_cluster_dist.groupby(clusters_by_id).mean() # 6. Drop clusters whose entire column is NaN df_group_mean_clean = df_group_mean.dropna(axis=1, how="all") if df_group_mean_clean.empty: print("Warning: df_group_mean_clean is empty. Please check the data.") return pd.DataFrame(), pd.DataFrame() # 7. Perform hierarchical clustering separately on rows and columns row_linkage = linkage(df_group_mean_clean, method=method) col_linkage = linkage(df_group_mean_clean.T, method=method) # 8. cophenet: correctly obtain cophenetic distances (condensed form) row_coph_corr, row_coph_condensed = cophenet(row_linkage, pdist(df_group_mean_clean.values)) col_coph_corr, col_coph_condensed = cophenet(col_linkage, pdist(df_group_mean_clean.T.values)) # 9. Convert condensed distances to square form (squareform) row_cophenetic_square = squareform(row_coph_condensed) col_cophenetic_square = squareform(col_coph_condensed) # 10. Build DataFrame row_labels = df_group_mean_clean.index col_labels = df_group_mean_clean.columns row_cophenetic_df = pd.DataFrame( row_cophenetic_square, index=row_labels, columns=row_labels ) col_cophenetic_df = pd.DataFrame( col_cophenetic_square, index=col_labels, columns=col_labels ) # -------- Compute min/max for rows and columns separately and normalize to [0,1] -------- row_min = row_cophenetic_df.values.min() row_max = row_cophenetic_df.values.max() col_min = col_cophenetic_df.values.min() col_max = col_cophenetic_df.values.max() def normalize_df(df: pd.DataFrame, dmin: float, dmax: float) -> pd.DataFrame: if dmin == dmax: # If there is no range, return original DF unchanged (or all 0) return df return (df - dmin) / (dmax - dmin) row_cophenetic_df_norm = normalize_df(row_cophenetic_df, row_min, row_max) col_cophenetic_df_norm = normalize_df(col_cophenetic_df, col_min, col_max) # Print to inspect range print("Cophenetic distance & normalization done.") print( f"Row dist range -> original: [{row_min:.4f}, {row_max:.4f}], normalized: [{row_cophenetic_df_norm.values.min():.4f}, {row_cophenetic_df_norm.values.max():.4f}]") print( f"Col dist range -> original: [{col_min:.4f}, {col_max:.4f}], normalized: [{col_cophenetic_df_norm.values.min():.4f}, {col_cophenetic_df_norm.values.max():.4f}]") return row_cophenetic_df_norm, col_cophenetic_df_norm
# Cell-GPS compute_cophenetic_distances_from_df.py import os import numpy as np import pandas as pd from typing import Optional, Tuple from scipy.cluster.hierarchy import linkage, cophenet from scipy.spatial.distance import pdist, squareform from sklearn.neighbors import NearestNeighbors
[docs] def compute_searcher_findee_distance_matrix_from_df( df: pd.DataFrame, x_col: str = "x", y_col: str = "y", z_col: Optional[str] = None, celltype_col: str = "celltype" ) -> pd.DataFrame: """ Compute and return a directed inter-cluster average nearest-neighbor distance matrix. Row and column indices are the clusters (cell types) present in df; rows represent "Searcher" clusters, columns represent "Findee" clusters. Each element is the average nearest-neighbor distance from all cells in the row cluster to all cells in the column cluster. Clusters with no cells in the data will not appear in the result matrix. Parameters: ---------- df : pd.DataFrame DataFrame containing cell coordinates and type data. x_col, y_col : str, optional Column names for cell x/y coordinates. Defaults to "x" and "y". z_col : Optional[str], optional Column name for the z coordinate; if provided it is used, otherwise None means 2D only. celltype_col : str, optional Column name for cell type / cluster labels. Defaults to "celltype". Returns: ------- pd.DataFrame Distance matrix DataFrame with cluster names as index and columns. Shape is (n_clusters, n_clusters); values are the average nearest-neighbor distance between the corresponding cluster pairs. NaN if unavailable. """ # 1. Check required columns exist required_cols = {x_col, y_col, celltype_col} if z_col is not None: required_cols.add(z_col) if not required_cols.issubset(df.columns): raise ValueError(f"DataFrame must contain the following columns: {required_cols}") # 2. Extract cluster info and remove unused categories clusters = df[celltype_col].astype("category") clusters = clusters.cat.remove_unused_categories() unique_clusters = clusters.cat.categories # all actually existing cluster categories # 3. Extract cell coordinates (numpy array) coord_cols = [x_col, y_col] + ([z_col] if z_col is not None else []) coords = df[coord_cols].values # shape: (n_cells, dims) # 4. Initialize cell × cluster nearest-neighbor distance matrix df_nearest_cluster_dist = pd.DataFrame(index=df.index, columns=unique_clusters, dtype=float) # 5. Compute distances from each cell to the nearest cell in each target cluster for c in unique_clusters: mask_c = (clusters == c) coords_c = coords[mask_c] if coords_c.shape[0] == 0: # If this cluster has no cells, keep the entire column as NaN df_nearest_cluster_dist.loc[:, c] = np.nan continue # Build nearest-neighbor model for current cluster and compute distances from all cells nbrs = NearestNeighbors(n_neighbors=1, algorithm="auto").fit(coords_c) dist_c, _ = nbrs.kneighbors(coords) df_nearest_cluster_dist[c] = dist_c[:, 0] # 6. Group by source cluster and compute mean to get cluster × cluster average distance matrix distance_matrix = df_nearest_cluster_dist.groupby(clusters).mean() # 7. Drop columns that are entirely NaN (clusters with no cells) distance_matrix = distance_matrix.dropna(axis=1, how="all") return distance_matrix
[docs] def compute_cophenetic_from_distance_matrix( distance_matrix: pd.DataFrame, method: str = "average", show_corr: bool = False ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Perform hierarchical clustering in both row and column directions on the given inter-cluster distance matrix, and compute cophenetic distance matrices. Results are independently normalized to [0,1] for rows and columns. Parameters: ---------- distance_matrix : pd.DataFrame Input distance matrix with source clusters as rows and target clusters as columns (e.g. output of compute_searcher_findee_distance_matrix_from_df). method : str, optional Linkage method for hierarchical clustering. Defaults to "average". show_corr : bool, optional Whether to print the cophenetic correlation coefficient (printed separately for rows and columns). Defaults to False. Returns: ------- Tuple[pd.DataFrame, pd.DataFrame] (row_coph, col_coph). Cophenetic distance matrices (DataFrames) for row and column clusters, each independently normalized to [0,1]. """ # 1. Perform hierarchical clustering on rows row_linkage = linkage(distance_matrix, method=method) # 2. Perform hierarchical clustering on columns col_linkage = linkage(distance_matrix.T, method=method) # 3. Compute cophenetic distances and correlation coefficients row_coph_corr, row_coph_condensed = cophenet(row_linkage, pdist(distance_matrix.values)) col_coph_corr, col_coph_condensed = cophenet(col_linkage, pdist(distance_matrix.T.values)) # 4. Convert condensed distances to square form row_cophenetic_square = squareform(row_coph_condensed) col_cophenetic_square = squareform(col_coph_condensed) # 5. Build DataFrames (preserve cluster labels) row_labels = distance_matrix.index col_labels = distance_matrix.columns row_cophenetic_df = pd.DataFrame(row_cophenetic_square, index=row_labels, columns=row_labels) col_cophenetic_df = pd.DataFrame(col_cophenetic_square, index=col_labels, columns=col_labels) # 6. Normalize row and column distance matrices separately to [0,1] def normalize_df(df: pd.DataFrame) -> pd.DataFrame: dmin, dmax = df.values.min(), df.values.max() return df if dmin == dmax else (df - dmin) / (dmax - dmin) row_cophenetic_df_norm = normalize_df(row_cophenetic_df) col_cophenetic_df_norm = normalize_df(col_cophenetic_df) # 7. Optionally print cophenetic correlation coefficients if show_corr: print(f"Row cophenetic correlation coefficient: {row_coph_corr:.4f}") print(f"Column cophenetic correlation coefficient: {col_coph_corr:.4f}") # 8. Return result matrices return row_cophenetic_df_norm, col_cophenetic_df_norm
[docs] def compute_cophenetic_distances_from_df( df: pd.DataFrame, x_col: str = "x", y_col: str = "y", z_col: Optional[str] = None, celltype_col: str = "celltype", output_dir: Optional[str] = None, method: str = "average", show_corr: bool = False ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Compute and return cophenetic distance matrices in both row and column dimensions, then apply linear normalization to [0, 1] for each separately. If z_col is provided, uses (x, y, z) for distance computation; otherwise uses only (x, y). Parameters: ---------- df : pd.DataFrame DataFrame containing cell data. x_col, y_col, z_col : str, optional Column names for spatial coordinates. z_col defaults to None. celltype_col : str, optional Column name for cell type. output_dir : Optional[str] Output file directory; if None, uses the current working directory. method : str, optional Linkage method for hierarchical clustering. Defaults to "average". show_corr : bool, optional Whether to print the cophenetic correlation coefficient for rows and columns. Defaults to False. Returns: ------- Tuple[pd.DataFrame, pd.DataFrame] Row and column cophenetic distance matrices, both normalized to [0, 1]. """ # 0. Ensure output directory exists if output_dir is None: output_dir = os.getcwd() os.makedirs(output_dir, exist_ok=True) # 1. Compute inter-cluster average nearest-neighbor distance matrix distance_matrix = compute_searcher_findee_distance_matrix_from_df(df, x_col, y_col, z_col, celltype_col) # 2. Check if the matrix is empty if distance_matrix.empty: raise ValueError("df_group_mean_clean is empty, please check the data.") # 3. Compute cophenetic distance matrix and normalize row_coph, col_coph = compute_cophenetic_from_distance_matrix(distance_matrix, method=method, show_corr=show_corr) # 4. Return results return row_coph, col_coph
# ---------------- plot_cophenetic_heatmap.py ---------------- import os import contextlib import logging from typing import Optional, Tuple import matplotlib as mpl import matplotlib.font_manager as fm mpl.use("Agg") import matplotlib.pyplot as plt import seaborn as sns import pandas as pd # ---------- 1. Make text in PDF editable ---------- mpl.rcParams["pdf.fonttype"] = 42 mpl.rcParams["ps.fonttype"] = 42 # ---------- 2. Utility: silence any logger ---------- @contextlib.contextmanager def silence(logger_name: str, level: int = logging.ERROR): """Temporarily raise the logging level of *logger_name*.""" logger = logging.getLogger(logger_name) old_level = logger.level logger.setLevel(level) try: yield finally: logger.setLevel(old_level) # ---------- 3. Ensure a valid sans-serif font is available ---------- def _ensure_font(): """Use Arial if present; otherwise switch to Liberation Sans / DejaVu Sans.""" want = "Arial" if any(want in f.name for f in fm.fontManager.ttflist): mpl.rcParams["font.family"] = want return fallback = "Liberation Sans" if not any(fallback in f.name for f in fm.fontManager.ttflist): fallback = "DejaVu Sans" # Override font.family and font.sans-serif list, removing Arial mpl.rcParams["font.family"] = [fallback] mpl.rcParams["font.sans-serif"] = [fallback] # ---------- 4. Core function ----------
[docs] def plot_cophenetic_heatmap( matrix: pd.DataFrame, matrix_name: Optional[str] = None, output_dir: Optional[str] = None, output_filename: Optional[str] = None, figsize: Optional[Tuple[float, float]] = None, cmap: str = "RdBu", linewidths: float = 0.5, annot: bool = False, sample: str = "Sample", xlabel: Optional[str] = None, ylabel: Optional[str] = None, show_dendrogram: bool = True, quiet: bool = True, return_figure: bool = False, return_image: bool = False, dpi: int = 300, # image DPI, affects image quality ): """ Draw a cophenetic heatmap (seaborn.clustermap), guaranteeing: • Text in PDF is editable • Legend position is auto-adjusted • figsize is dynamically adjusted • fontTools.subset & findfont logs are silenced Parameters: ...existing parameters... return_figure: whether to return the figure object instead of saving to file return_image: whether to return a high-resolution PIL image instead of the figure object dpi: image DPI resolution, only effective when return_image=True Returns: If return_figure=True, returns a seaborn.ClusterGrid object If return_image=True, returns a PIL.Image object Otherwise returns None """ # When both return modes are specified, return image takes priority if return_image: return_figure = False # ---- Ensure a usable font is available, suppress findfont warnings ---- _ensure_font() # ---- Dynamic figsize ---- if figsize is None: rows, cols = matrix.shape figsize = (max(8.0, 0.25 * cols + 0.5), max(8.0, 0.25 * rows + 0.5)) # ---- Output path & title ---- if not (return_figure or return_image): # only handle path when saving a file if output_dir is None: output_dir = os.getcwd() os.makedirs(output_dir, exist_ok=True) title_map = { "row_coph": ( f"StructureMap of {sample}", f"StructureMap_of_{sample}.pdf", "Searcher", "Searcher", ), "col_coph": ( f"Findee's D score of {sample}", f"Findee_D_score_of_{sample}.pdf", "Findee", "Findee", ), } title, default_pdf, xlab, ylab = title_map.get( matrix_name, ( f"D score of {sample}", f"D_score_of_{sample}.pdf", xlabel or "Findee", ylabel or "Searcher", ), ) xlabel, ylabel = xlabel or xlab, ylabel or ylab # Only set path when saving a file if not (return_figure or return_image): pdf_path = os.path.join(output_dir, output_filename or default_pdf) # ---- Internal drawing function ---- def _draw(): g = sns.clustermap( data=matrix, figsize=figsize, cmap=cmap, row_cluster=show_dendrogram, col_cluster=show_dendrogram, linewidths=linewidths, annot=annot, ) # 1) Ensure heatmap cells are square g.ax_heatmap.set_aspect("equal") # 2) Adjust dendrogram & colorbar positions if show_dendrogram: heat = g.ax_heatmap.get_position() row_d = g.ax_row_dendrogram.get_position() col_d = g.ax_col_dendrogram.get_position() # 2-1 Align row dendrogram vertically g.ax_row_dendrogram.set_position( [row_d.x0, heat.y0, row_d.width, heat.height] ) # 2-2 Align column dendrogram horizontally g.ax_col_dendrogram.set_position( [heat.x0, col_d.y0, heat.width, col_d.height] ) # 2-3 Place colorbar in the top-left empty area empty_w = heat.x0 - row_d.x0 empty_h = (heat.y0 + heat.height) - (col_d.y0 + col_d.height) g.cax.set_position( [ row_d.x0 + empty_w * 0.35, col_d.y0 + col_d.height + empty_h * 0.15, empty_w * 0.30, empty_h * 0.70, ] ) # 3) Axis labels & title g.ax_heatmap.set_xlabel(xlabel, fontsize=12) g.ax_heatmap.set_ylabel(ylabel, fontsize=12) g.ax_heatmap.set_yticklabels(g.ax_heatmap.get_yticklabels(), rotation=0) g.fig.suptitle(title, fontsize=12, y=1.02) # Handle figure according to return type if return_image: # Convert figure to high-resolution image from io import BytesIO from PIL import Image # Create an in-memory buffer for the image buf = BytesIO() g.fig.savefig(buf, format='png', dpi=dpi, bbox_inches='tight') buf.seek(0) # Load as PIL image image = Image.open(buf) image_copy = image.copy() # create a copy so the original can be closed buf.close() plt.close(g.fig) # close figure to avoid memory leaks return image_copy elif not return_figure: plt.savefig(pdf_path, format="pdf", bbox_inches="tight") plt.close(g.fig) return None # Return ClusterGrid object return g # ---- Execute drawing (with optional log silencing) ---- if quiet: with silence("fontTools.subset", logging.ERROR), silence( "matplotlib.font_manager", logging.ERROR ): result = _draw() else: result = _draw() # If saving a file, print message if not (return_figure or return_image): print(f"Heat‑map saved to: {pdf_path}") return result