Source code for stream2.tools._elpigraph

"""Functions to calculate principal graph."""

import numpy as np
import pandas as pd
import scipy
import elpigraph
import networkx as nx
from copy import deepcopy
from sklearn.cluster import SpectralClustering, AffinityPropagation, KMeans
from sklearn.metrics.pairwise import pairwise_distances, euclidean_distances

from .._settings import settings


[docs] def learn_graph( adata, method="principal_tree", obsm="X_dr", layer=None, n_nodes=50, epg_lambda=0.01, epg_mu=0.1, epg_alpha=0.02, epg_trimming_radius=float("inf"), use_seed=True, use_partition=False, use_weights=False, ordinal_label=None, ordinal_supervision_strength=1, ordinal_root_point=None, n_jobs=None, GPU=False, max_candidates={"AddNode2Node": 20, "BisectEdge": 20, "ShrinkEdge": 50}, store_evolution=False, **kwargs, ): """Learn principal graph. Parameters ---------- adata: `AnnData` Anndata object. method: `str`, (default: 'principal_curve'); Method used to calculate the graph. obsm: `str`, optional (default: 'X_dr') The multi-dimensional annotation of observations used to learn the graph layer: `str`, optional (default: None) The layer used to learn the graph use_seed: bool Whether to use the seed graph in adata.uns['seed_epg'] generated by st.seed_graph. If True, ignores obsm and layer parameters use_partition: bool Whether to learn a disconnected graph for each category in adata.uns['partition'] use_weights: bool Whether to weight points with adata.obs['pointweights'] GPU: Whether to perform computations using GPU (requires cupy library) max_candidates: Max number of candidates to generate with each graph grammar when exploring graph topology at each iteration. Setting numbers lower can increase speed, especially for higher number of nodes. store_evolution: Store the evolution of the graph for each number of nodes **kwargs: Additional arguments to each method Returns ------- updates `adata.uns['epg']` with the following fields. conn: `sparse matrix` (`.uns['epg']['conn']`) A connectivity sparse matrix. node_pos: `array` (`.uns['epg']['node_pos']`) Node positions. edge: `array` (`.uns['epg']['edge']`) Node edges. """ if use_partition: print("Learning elastic principal graph for each partition...") if type(use_partition) is bool: partitions = adata.obs["partition"].unique() elif type(use_partition) is list: partitions = use_partition else: raise ValueError( "use_partition should be a bool or a list of partitions" ) if ordinal_label is not None: raise ValueError( "use_partition can't be used together with ordinal_label" ) if store_evolution: raise ValueError( "can't use store_evolution=True when use_partition=True" ) merged_nodep = [] merged_edges = [] num_edges = 0 for part in adata.obs["partition"].unique(): if part not in partitions: p_adata = _subset_adata(adata, part) else: if use_seed: p_adata = _subset_adata(adata, part) if len(p_adata.uns["seed_epg"]["node_pos"]) < n_nodes: nnodes = n_nodes else: nnodes = len(p_adata.uns["seed_epg"]["node_pos"]) + 1 else: p_adata = adata[adata.obs["partition"] == part].copy() _learn_graph( p_adata, method=method, obsm=obsm, layer=layer, n_nodes=nnodes, epg_lambda=epg_lambda, epg_mu=epg_mu, epg_alpha=epg_alpha, epg_trimming_radius=epg_trimming_radius, use_seed=use_seed, use_weights=use_weights, n_jobs=n_jobs, ordinal_label=ordinal_label, ordinal_supervision_strength=ordinal_supervision_strength, ordinal_root_point=[ordinal_root_point], GPU=GPU, max_candidates=max_candidates, store_evolution=store_evolution, **kwargs, ) merged_nodep.append(p_adata.uns["epg"]["node_pos"]) merged_edges.append(p_adata.uns["epg"]["edge"] + num_edges) num_edges += len(p_adata.uns["epg"]["node_pos"]) adata.uns["epg"] = {} adata.uns["epg"]["node_pos"] = np.concatenate(merged_nodep) adata.uns["epg"]["edge"] = np.concatenate((merged_edges)) adata.uns["epg"]["node_partition"] = np.repeat( adata.obs["partition"].unique(), [len(nodep) for nodep in merged_nodep], ).astype(str) adata.uns["epg"]["edge_partition"] = np.repeat( adata.obs["partition"].unique(), [len(edges) for edges in merged_edges], ).astype(str) adata.uns["epg"]["params"] = p_adata.uns["epg"]["params"] X = _get_graph_data(adata, key="epg") _store_graph_attributes(adata, X, key="epg") else: _learn_graph( adata, method=method, obsm=obsm, layer=layer, n_nodes=n_nodes, epg_lambda=epg_lambda, epg_mu=epg_mu, epg_alpha=epg_alpha, epg_trimming_radius=epg_trimming_radius, use_seed=use_seed, use_weights=use_weights, ordinal_label=ordinal_label, ordinal_supervision_strength=ordinal_supervision_strength, ordinal_root_point=[ordinal_root_point], n_jobs=n_jobs, GPU=GPU, max_candidates=max_candidates, store_evolution=store_evolution, **kwargs, )
def _learn_graph( adata, method="principal_tree", obsm="X_dr", layer=None, n_nodes=50, epg_lambda=0.01, epg_mu=0.1, epg_alpha=0.02, epg_trimming_radius=float("inf"), use_seed=True, use_weights=False, ordinal_label=None, ordinal_supervision_strength=1, ordinal_root_point=None, n_jobs=None, GPU=False, max_candidates={"AddNode2Node": 20, "BisectEdge": 20, "ShrinkEdge": 50}, store_evolution=False, **kwargs, ): """Learn principal graph. Parameters ---------- adata: `AnnData` Anndata object. method: `str`, (default: 'principal_curve'); Method used to calculate the graph. obsm: `str`, optional (default: 'X_dr') The multi-dimensional annotation of observations used to learn the graph layer: `str`, optional (default: None) The layer used to learn the graph use_seed: bool Whether to use the seed graph in adata.uns['seed_epg'] generated by st.seed_graph. If True, ignores obsm and layer parameters **kwargs: Additional arguments to each method Returns ------- updates `adata.uns['epg']` with the following fields. conn: `sparse matrix` (`.uns['epg']['conn']`) A connectivity sparse matrix. node_pos: `array` (`.uns['epg']['node_pos']`) Node positions. """ assert method in [ "principal_curve", "principal_tree", "principal_circle", ], ( "`method` must be one of " "['principal_curve','principal_tree','principal_circle']" ) if use_seed and (method == "principal_tree"): if "seed_epg" not in adata.uns: raise ValueError( "could not find 'seed_epg' in `adata.uns. Please run" " st.tl.seed_graph" ) if n_nodes <= len(adata.uns["seed_epg"]["node_pos"]): raise ValueError( f"The seed graph already has at least {n_nodes} nodes. Please" " run st.tl.learn_graph with higher n_nodes" ) kwargs["InitNodePositions"] = adata.uns["seed_epg"]["node_pos"] kwargs["InitEdges"] = adata.uns["seed_epg"]["edge"] if adata.uns["seed_epg"]["params"]["obsm"] is not None: mat = adata.obsm[adata.uns["seed_epg"]["params"]["obsm"]] elif adata.uns["seed_epg"]["params"]["layer"] is not None: mat = adata.obsm[adata.uns["seed_epg"]["params"]["layer"]] else: print("Learning the graph on adata.X") mat = adata.X else: if ( use_seed and (method != "principal_tree") and ("seed_epg" in adata.uns) ): print(f"WARNING: seed graph is ignored when using method {method}") kwargs["InitNodePositions"] = None kwargs["InitEdges"] = None if sum(list(map(lambda x: x is not None, [layer, obsm]))) == 2: raise ValueError("Only one of `layer` and `obsm` can be used") elif obsm is not None: if obsm in adata.obsm: mat = adata.obsm[obsm] else: raise ValueError(f"could not find {obsm} in `adata.obsm`") elif layer is not None: if layer in adata.layers: mat = adata.layers[layer] else: raise ValueError(f"could not find {layer} in `adata.layers`") else: mat = adata.X if n_jobs is None: n_jobs = settings.n_jobs if use_weights: if "pointweights" not in adata.obs: raise ValueError( "adata.obs['pointweights'] not found. Please run" " st2.tl.get_weights" ) weights = np.array(adata.obs["pointweights"]).reshape((-1, 1)) else: weights = None if ordinal_label is not None: if type(ordinal_label) == str: kwargs["pseudotime"] = adata.obs[ordinal_label].to_numpy() else: raise ValueError(f"ordinal_label key not found in adata.obs.") kwargs["pseudotimeLambda"] = ordinal_supervision_strength kwargs["FixNodesAtPoints"] = [ordinal_root_point] if method == "principal_curve": dict_epg = elpigraph.computeElasticPrincipalCurve( X=mat, NumNodes=n_nodes, TrimmingRadius=epg_trimming_radius, n_cores=n_jobs, Do_PCA=False, CenterData=False, Lambda=epg_lambda, Mu=epg_mu, alpha=epg_alpha, PointWeights=weights, GPU=GPU, MaxNumberOfGraphCandidatesDict=max_candidates, StoreGraphEvolution=store_evolution, **kwargs, )[0] if method == "principal_tree": dict_epg = elpigraph.computeElasticPrincipalTree( X=mat, NumNodes=n_nodes, TrimmingRadius=epg_trimming_radius, n_cores=n_jobs, Do_PCA=False, CenterData=False, Lambda=epg_lambda, Mu=epg_mu, alpha=epg_alpha, PointWeights=weights, GPU=GPU, MaxNumberOfGraphCandidatesDict=max_candidates, StoreGraphEvolution=store_evolution, **kwargs, )[0] if method == "principal_circle": dict_epg = elpigraph.computeElasticPrincipalCircle( X=mat, NumNodes=n_nodes, TrimmingRadius=epg_trimming_radius, n_cores=n_jobs, Do_PCA=False, CenterData=False, Lambda=epg_lambda, Mu=epg_mu, alpha=epg_alpha, PointWeights=weights, InitNodes=3, GPU=GPU, MaxNumberOfGraphCandidatesDict=max_candidates, StoreGraphEvolution=store_evolution, **kwargs, )[0] adata.uns["epg"] = dict() adata.uns["epg"]["node"] = np.arange(n_nodes) adata.uns["epg"]["node_pos"] = dict_epg["NodePositions"] adata.uns["epg"]["edge"] = dict_epg["Edges"][0] adata.uns["epg"]["params"] = { "method": method, "obsm": obsm, "layer": layer, "n_nodes": n_nodes, "epg_lambda": epg_lambda, "epg_mu": epg_mu, "epg_alpha": epg_alpha, "use_seed": use_seed, } if store_evolution: adata.uns["epg"]["graph_evolution"] = { "all_node_pos": dict_epg["AllNodePositions"], "all_edge": dict_epg["AllElasticMatrices"], } _store_graph_attributes(adata, mat, key="epg")
[docs] def seed_graph( adata, obsm="X_dr", layer=None, clustering="kmeans", damping=0.75, pref_perc=50, n_clusters=10, max_n_clusters=200, n_neighbors=50, nb_pct=None, paths_favored=[], paths_disfavored=[], label=None, label_strength=2, force=False, use_weights=False, use_partition=False, ): """Seeding the initial elastic principal graph. Parameters ---------- adata: AnnData Annotated data matrix. obsm: `str`, optional (default: 'X_dr') The multi-dimensional annotation of observations used to learn the graph layer: `str`, optional (default: None) The layer used to learn the graph init_nodes_pos: `array`, shape = [n_nodes,n_dimension], optional (default: `None`) initial node positions init_edges: `array`, shape = [n_edges,2], optional (default: `None`) initial edges, all the initial nodes should be included in the tree structure clustering: `str`, optional (default: 'kmeans') Choose from {{'ap','kmeans','sc'}} clustering method used to infer the initial nodes. 'ap' affinity propagation 'kmeans' K-Means clustering 'sc' spectral clustering damping: `float`, optional (default: 0.75) Damping factor (between 0.5 and 1) for affinity propagation. pref_perc: `int`, optional (default: 50) Preference percentile (between 0 and 100). The percentile of the input similarities for affinity propagation. n_clusters: `int`, optional (default: 10) Number of clusters (only valid once 'clustering' is specified as 'sc' or 'kmeans'). max_n_clusters: `int`, optional (default: 200) The allowed maximum number of clusters for 'ap'. n_neighbors: `int`, optional (default: 50) The number of neighbor cells used for spectral clustering. nb_pct: `float`, optional (default: None) The percentage of neighbor cells (when specified, it will overwrite n_neighbors). paths_favored: list of lists, optional (default: []) Favored paths between categorical labels used for supervised MST initialization paths_disfavored: list of lists, optional (default: []) Disfavored paths between categorical labels used for supervised MST initialization label: `str`, optional (default: None) Categorical labels for supervised MST initialization label_strength: float in [1,oo) Strength of supervised MST initialization force: bool (experimental feature) Force supervised MST initialization to follow specified paths rather than using soft constraint use_weights: bool Whether to weight points with adata.obs['pointweights'] use_partition: bool Whether to learn a disconnected graph for each category in adata.uns['partition'] Returns ------- adata.obs['clustering']: `pandas.core.series.Series` (`adata.obs['clustering']`,dtype `str`) Array of dim (number of samples) that stores the clustering labels ('0', '1', …) for each cell. adata.uns['seed_epg'] : dict Elastic principal graph structure. """ if use_partition: print("Seeding initial graph for each partition...") if type(use_partition) is bool: partitions = adata.obs["partition"].unique() elif type(use_partition) is list: partitions = use_partition else: raise ValueError( "use_partition should be a bool or a list of partitions" ) merged_nodep = [] merged_edges = [] num_edges = 0 for part in adata.obs["partition"].unique(): if type(use_partition) is list: p_adata = _subset_adata(adata, part) else: p_adata = adata[adata.obs["partition"] == part].copy() if part in partitions: _seed_graph( p_adata, obsm=obsm, layer=layer, clustering=clustering, damping=damping, pref_perc=pref_perc, n_clusters=n_clusters, max_n_clusters=max_n_clusters, n_neighbors=n_neighbors, nb_pct=nb_pct, paths_favored=paths_favored, paths_disfavored=paths_disfavored, label=label, label_strength=label_strength, force=force, use_weights=use_weights, verbose=False, ) merged_nodep.append(p_adata.uns["seed_epg"]["node_pos"]) merged_edges.append(p_adata.uns["seed_epg"]["edge"] + num_edges) num_edges += len(p_adata.uns["seed_epg"]["node_pos"]) adata.uns["seed_epg"] = {} adata.uns["seed_epg"]["node_pos"] = np.concatenate(merged_nodep) adata.uns["seed_epg"]["edge"] = np.concatenate((merged_edges)) adata.uns["seed_epg"]["node_partition"] = np.repeat( adata.obs["partition"].unique(), [len(nodep) for nodep in merged_nodep], ).astype(str) adata.uns["seed_epg"]["edge_partition"] = np.repeat( adata.obs["partition"].unique(), [len(edges) for edges in merged_edges], ).astype(str) adata.uns["seed_epg"]["params"] = p_adata.uns["seed_epg"]["params"] X = _get_graph_data(adata, key="seed_epg") _store_graph_attributes(adata, X, key="seed_epg") else: _seed_graph( adata, obsm=obsm, layer=layer, clustering=clustering, damping=damping, pref_perc=pref_perc, n_clusters=n_clusters, max_n_clusters=max_n_clusters, n_neighbors=n_neighbors, nb_pct=nb_pct, paths_favored=paths_favored, paths_disfavored=paths_disfavored, label=label, label_strength=label_strength, force=force, use_weights=use_weights, )
def _seed_graph( adata, obsm="X_dr", layer=None, clustering="kmeans", damping=0.75, pref_perc=50, n_clusters=10, max_n_clusters=200, n_neighbors=50, nb_pct=None, paths_favored=[], paths_disfavored=[], label=None, label_strength=2, force=False, use_weights=False, verbose=True, ): """Internal method to seed_graph""" if verbose: print("Seeding initial graph...") if sum(list(map(lambda x: x is not None, [layer, obsm]))) == 2: raise ValueError("Only one of `layer` and `obsm` can be used") elif obsm is not None: if obsm in adata.obsm: mat = adata.obsm[obsm] adata.uns["seed"] = obsm else: raise ValueError(f"could not find {obsm} in `adata.obsm`") elif layer is not None: if layer in adata.layers: mat = adata.layers[layer] adata.uns["seed"] = obsm else: raise ValueError(f"could not find {layer} in `adata.layers`") else: mat = adata.X if nb_pct is not None: n_neighbors = int(np.around(mat.shape[0] * nb_pct)) if label_strength<1: raise ValueError("label_strength should be >=1") if verbose: print("Clustering...") if clustering == "ap": if verbose: print("Affinity propagation ...") ap = AffinityPropagation( damping=damping, random_state=42, preference=np.percentile( -euclidean_distances(mat, squared=True), pref_perc ), ).fit(mat) # ap = AffinityPropagation(damping=damping).fit(mat) if ap.cluster_centers_.shape[0] > max_n_clusters: if verbose: print( "The number of clusters is " + str(ap.cluster_centers_.shape[0]) ) if verbose: print( "Too many clusters are generated, please lower pref_perc" " or increase damping and retry it" ) return cluster_labels = ap.labels_ init_nodes_pos = ap.cluster_centers_ elif clustering == "sc": if verbose: print("Spectral clustering ...") sc = SpectralClustering( n_clusters=n_clusters, affinity="nearest_neighbors", n_neighbors=n_neighbors, eigen_solver="arpack", random_state=42, ).fit(mat) cluster_labels = sc.labels_ init_nodes_pos = np.empty((0, mat.shape[1])) # cluster centers for x in np.unique(cluster_labels): id_cells = np.array(range(mat.shape[0]))[cluster_labels == x] init_nodes_pos = np.vstack( (init_nodes_pos, np.median(mat[id_cells, :], axis=0)) ) elif clustering == "kmeans": if verbose: print("K-Means clustering ...") if use_weights: if "pointweights" not in adata.obs: raise ValueError( "adata.obs['pointweights'] not found. Please run" " st2.tl.get_weights" ) weights = np.array(adata.obs["pointweights"]).flatten() else: weights = None kmeans = KMeans( n_clusters=n_clusters, init="k-means++", n_init=10, max_iter=300, tol=0.0001, algorithm='lloyd',random_state=42 ).fit(mat, sample_weight=weights) cluster_labels = kmeans.labels_ init_nodes_pos = kmeans.cluster_centers_ else: if verbose: print("'" + clustering + "'" + " is not supported") adata.obs[clustering] = ["cluster " + str(x) for x in cluster_labels] # Minimum Spanning Tree ### if verbose: print("Calculating minimum spanning tree...") # ---if supervised adjacency matrix option if ( ((len(paths_favored) > 0) or (len(paths_disfavored) > 0)) and label is None ) or ( ((len(paths_favored) == 0) and (len(paths_disfavored) == 0)) and label is not None ): raise ValueError( "Both a label key (label: str) and cluster paths (paths: list of" " list) need to be provided for path-supervised initialization" ) elif ( (len(paths_favored) > 0) or (len(paths_disfavored) > 0) ) and label is not None: ( init_nodes_pos, clus_adjmat, adjmat_strength, num_modes, num_labels, labels_ignored, ) = _categorical_adjmat( mat, init_nodes_pos, paths_favored, paths_disfavored, adata.obs[label], label_strength, ) D = pairwise_distances(init_nodes_pos) G = nx.from_numpy_array(D * clus_adjmat) # ---else unsupervised else: D = pairwise_distances(init_nodes_pos) G = nx.from_numpy_array(D) # ---get edges from mst mst = nx.minimum_spanning_tree(G, ignore_nan=True) init_edges = np.array(mst.edges()) if force and label is not None: init_edges = _force_missing_connections( D, num_labels, num_modes, init_edges, paths_favored, clus_adjmat ) # Store results ### adata.uns["seed_epg"] = dict() adata.uns["seed_epg"]["node_pos"] = init_nodes_pos adata.uns["seed_epg"]["edge"] = init_edges adata.uns["seed_epg"]["params"] = dict( obsm=obsm, layer=layer, clustering=clustering, damping=damping, pref_perc=pref_perc, n_clusters=n_clusters, max_n_clusters=max_n_clusters, n_neighbors=n_neighbors, nb_pct=nb_pct, ) _store_graph_attributes(adata, mat, key="seed_epg") def _store_graph_attributes(adata, mat, key): """Compute graph attributes and store them in adata.uns[key]""" G = nx.Graph() G.add_edges_from(adata.uns[key]["edge"].tolist(), weight=1) mat_conn = nx.to_scipy_sparse_array( G, nodelist=np.arange(len(adata.uns[key]["node_pos"])), weight="weight", ) # partition points node_id, node_dist = elpigraph.src.core.PartitionData( X=mat, NodePositions=adata.uns[key]["node_pos"], MaxBlockSize=len(adata.uns[key]["node_pos"]) ** 4, SquaredX=np.sum(mat ** 2, axis=1, keepdims=1), ) # project points onto edges dict_proj = elpigraph.src.reporting.project_point_onto_graph( X=mat, NodePositions=adata.uns[key]["node_pos"], Edges=adata.uns[key]["edge"], Partition=node_id, ) edge_dist = np.linalg.norm( mat - dict_proj['X_projected'], axis=1) adata.obs[f"{key}_node_id"] = node_id.flatten() adata.obs[f"{key}_node_dist"] = node_dist adata.obs[f"{key}_edge_id"] = dict_proj["EdgeID"].astype(int) adata.obs[f"{key}_edge_loc"] = dict_proj["ProjectionValues"] adata.obs[f"{key}_edge_dist"] = edge_dist # adata.obsm[f"X_{key}_proj"] = dict_proj["X_projected"] adata.uns[key]["conn"] = mat_conn adata.uns[key]["edge_len"] = dict_proj["EdgeLen"] def _get_branch_id(adata, key="epg"): """add adata.obs['branch_id']""" # get branches net = elpigraph.src.graphs.ConstructGraph( {"Edges": [adata.uns[key]["edge"]]} ) branches = elpigraph.src.graphs.GetSubGraph(net, "branches") _dict_branches = { (b[0], b[-1]): b for i, b in enumerate(branches) } # temporary branch node lists (not in order) ordered_edges, ordered_nodes = elpigraph.src.supervised.bf_search( _dict_branches, root_node=np.where(np.array(net.degree()) == 1)[0][0] ) # create ordered dict dict_branches = {} for i, e in enumerate(ordered_edges): # for each branch # store branch in order (both the key and the list) if e not in _dict_branches: dict_branches[e] = _dict_branches[e[::-1]][::-1] else: dict_branches[e] = _dict_branches[e] # disable warning pd.options.mode.chained_assignment = None point_edges = adata.uns[key]["edge"][adata.obs[f"{key}_edge_id"]] adata.obs[f"{key}_branch_id"] = "" for i, e in enumerate(point_edges): for k, v in dict_branches.items(): if all(np.isin(e, v)): adata.obs[f"{key}_branch_id"][i] = k # reactivate warning pd.options.mode.chained_assignment = "warn" # Categorical MST initialization utils ## def _force_missing_connections( D, num_labels, num_modes, init_edges, paths_favored, clus_adjmat ): found_missing = True while found_missing: found_missing = False edges_labels = np.array(list(num_labels.keys()))[num_modes][ init_edges ].tolist() for path in paths_favored: for i in range(len(path) - 1): if [path[i], path[i + 1]] not in edges_labels and [ path[i + 1], path[i], ] not in edges_labels: print(path[i], path[i + 1]) print(num_labels[path[i]], num_labels[path[i + 1]]) missing_is = np.where(num_modes == num_labels[path[i]])[0] missing_js = np.where( num_modes == num_labels[path[i + 1]] )[0] x = D[missing_is[:, None], missing_js] _i, _j = np.where(x == x.min()) mi, mj = missing_is[_i], missing_js[_j] D[mi, mj] = D[mj, mi] = -1.0 found_missing = True # ---get edges from mst G = nx.from_numpy_array(D * clus_adjmat) mst = nx.minimum_spanning_tree(G, ignore_nan=True) init_edges = np.array(mst.edges()) return init_edges def _get_partition_modes(mat, init_nodes_pos, labels): """Return most frequent label assigned to each node.""" labels = np.array(labels) part = elpigraph.src.core.PartitionData( mat, init_nodes_pos, 10 ** 6, np.sum(mat ** 2, axis=1, keepdims=1) )[0].flatten() modes = np.empty(len(init_nodes_pos), dtype=labels.dtype) for i in range(len(init_nodes_pos)): modes[i] = scipy.stats.mode(labels[part == i]).mode[0] return modes def _get_labels_adjmat(labels_u, labels_ignored, paths_favored, paths_disfavored, label_strength): """Create adjmat given labels and paths. labels_ignored are connected to all other labels """ num_labels = { s: i for i, s in enumerate(np.append(labels_u, labels_ignored)) } len_labels = len(labels_u) + len(labels_ignored) adjmat = np.ones((len_labels, len_labels)) # allow connections given from paths for p in paths_favored: for i in range(len(p) - 1): adjmat[num_labels[p[i]], num_labels[p[i + 1]]] = adjmat[ num_labels[p[i + 1]], num_labels[p[i]] ] = 1/label_strength # remove forbidden connections given from paths_disfavored for p in paths_disfavored: for i in range(len(p) - 1): adjmat[num_labels[p[i]], num_labels[p[i + 1]]] = adjmat[ num_labels[p[i + 1]], num_labels[p[i]] ] = label_strength return adjmat, num_labels def _get_clus_adjmat(adjmat_strength, num_modes, n_clusters): """Create clus_adjmat given labels adjmat and kmeans label assignment.""" adjmat_clus = np.ones((n_clusters, n_clusters)) for ei in range(len(adjmat_strength)): for ej in range(len(adjmat_strength)): clus_ei = np.where(num_modes == ei)[0] clus_ej = np.where(num_modes == ej)[0] adjmat_clus[ clus_ei[:, None], np.repeat(clus_ej[None], len(clus_ei), axis=0), ] = adjmat_strength[ei, ej] return adjmat_clus def _categorical_adjmat( mat, init_nodes_pos, paths_favored, paths_disfavored, labels, label_strength ): """Main function, create categorical adjmat given node positions, cluster paths, point labels.""" labels_u = np.unique([c for p in paths_favored for c in p]) labels_ignored = np.setdiff1d(labels, labels_u) # label adjacency matrix adjmat_strength, num_labels = _get_labels_adjmat( labels_u, labels_ignored, paths_favored, paths_disfavored, label_strength ) # assign label to nodes modes = _get_partition_modes(mat, init_nodes_pos, labels) num_modes = np.array([num_labels[m] for m in modes]) # add centroids if necessary to prevent bug # (if some label has no kmean assigned to it) labels_miss = np.setdiff1d(labels_u, modes) if len(labels_miss) > 0: print( f"Found label(s) {labels_miss} with no representative node. Adding" " label centroid(s) as node(s)" ) centroids = np.vstack( [mat[labels == s].mean(axis=0) for s in labels_miss] ) init_nodes_pos = np.vstack((init_nodes_pos, centroids)) modes = np.hstack((modes, labels_miss)) num_modes = np.array([num_labels[m] for m in modes]) # nodes adjacency matrix clus_adjmat = _get_clus_adjmat( adjmat_strength, num_modes, n_clusters=len(init_nodes_pos), ) return ( init_nodes_pos, clus_adjmat, adjmat_strength, num_modes, num_labels, labels_ignored, ) def _get_graph_data(adata, key): """get data matrix used to learn the graph.""" obsm = adata.uns[key]["params"]["obsm"] layer = adata.uns[key]["params"]["layer"] if obsm is not None: if obsm in adata.obsm: mat = adata.obsm[obsm] else: raise ValueError(f"could not find {obsm} in `adata.obsm`") elif layer is not None: if layer in adata.layers: mat = adata.layers[layer] else: raise ValueError(f"could not find {layer} in `adata.layers`") else: mat = adata.X return mat def _subset_adata(adata, part): p_adata = adata[adata.obs["partition"] == part].copy() for key in ["seed_epg", "epg"]: if key in p_adata.uns: p_adata.uns[key] = deepcopy(adata.uns[key]) p_adata.uns[key]["node_pos"] = p_adata.uns[key]["node_pos"][ p_adata.uns[key]["node_partition"] == part ] p_adata.uns[key]["edge"] = p_adata.uns[key]["edge"][ p_adata.uns[key]["edge_partition"] == part ] p_adata.uns[key]["edge"] -= p_adata.uns[key]["edge"].min() return p_adata