Source code for cafaeval.graph

import numpy as np
import copy
import logging
import os

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
_propagate_logger = logging.getLogger("cafaeval.propagate")
_propagate_logger.addHandler(logging.NullHandler())


[docs] class Graph: """ Ontology class. One ontology == one namespace DAG is the adjacence matrix (sparse) which represent a Directed Acyclic Graph where DAG(i,j) == 1 means that the go term i is_a (or is part_of) j Parents that are in a different namespace are discarded """ def __init__(self, namespace, terms_dict, ia_dict=None, orphans=False): """ terms_dict = {term: {name: , namespace: , def: , alt_id: , rel:}} """ self.namespace = namespace self.dag = [] # [[], ...] terms (rows, axis 0) x parents (columns, axis 1) self.terms_dict = {} # {term: {index: , name: , namespace: , def: } used to assign term indexes in the gt self.terms_dict_alt = {} # {alt_id: set(term, ...) } alternative ids to canonical ids self.terms_list = [] # [{id: term, name:, namespace: , def:, adj: set(), children: set()}, ...] self.idxs = None # Number of terms self.order = None self.toi = None self.toi_ia = None self.ia = None rel_list = [] for self.idxs, (term_id, term) in enumerate(terms_dict.items()): rel_list.extend([[term_id, rel, term['namespace']] for rel in term['rel']]) self.terms_list.append({'id': term_id, 'name': term['name'], 'namespace': namespace, 'def': term['def'], 'adj': set(), 'children': set()}) self.terms_dict[term_id] = {'index': self.idxs, 'name': term['name'], 'namespace': namespace, 'def': term['def']} for a_id in term['alt_id']: self.terms_dict_alt.setdefault(a_id, set()).add(term_id) self.idxs += 1 self.dag = np.zeros((self.idxs, self.idxs), dtype='bool') # id1 term (row, axis 0), id2 parent (column, axis 1) for id1, id2, ns in rel_list: if self.terms_dict.get(id2): i = self.terms_dict[id1]['index'] j = self.terms_dict[id2]['index'] self.dag[i, j] = 1 # Remove duplicates in adj and children lists # This ensures that a parent-child term does not have multiple edges, which could lead to wrong topological sorting self.terms_list[i]['adj'].add(j) self.terms_list[j]['children'].add(i) logging.debug("i,j {},{} {},{}".format(i, j, id1, id2)) else: logging.debug('Skipping branch to external namespace: {}'.format(id2)) logging.debug("dag {}".format(self.dag)) # Topological sorting self.top_sort() logging.debug("order sorted {}".format(self.order)) if orphans: self.toi = np.arange(self.dag.shape[0]) # All terms, also those without parents else: self.toi = np.nonzero(self.dag.sum(axis=1) > 0)[0] # Only terms with parents logging.debug("toi {}".format(self.toi)) if ia_dict is not None: self.set_ia(ia_dict) logging.info("Ontology: {}, total {}, roots {}, leaves {}, alternative_ids {}".format(self.namespace, len(np.where(self.dag.sum(axis=1) != 0)[0]), len(np.where(self.dag.sum(axis=1) == 0)[0]), len(np.where(self.dag.sum(axis=0) == 0)[0]), len(self.terms_dict_alt))) return
[docs] def top_sort(self): """ Takes a sparse matrix representing a DAG and returns an array with nodes indexes in topological order https://en.wikipedia.org/wiki/Topological_sorting """ indexes = [] visited = 0 (rows, cols) = self.dag.shape # create a vector containing the in-degree of each node in_degree = self.dag.sum(axis=0) # logging.debug("degree {}".format(in_degree)) # find the nodes with in-degree 0 (leaves) and add them to the queue queue = np.nonzero(in_degree == 0)[0].tolist() # logging.debug("queue {}".format(queue)) # for each element of the queue increment visits, add them to the list of ordered nodes # and decrease the in-degree of the neighbor nodes # and add them to the queue if they reach in-degree == 0 while queue: visited += 1 idx = queue.pop(0) indexes.append(idx) in_degree[idx] -= 1 l = self.terms_list[idx]['adj'] if len(l) > 0: for j in l: in_degree[j] -= 1 if in_degree[j] == 0: queue.append(j) # if visited is equal to the number of nodes in the graph then the sorting is complete # otherwise the graph can't be sorted with topological order if visited == rows: self.order = indexes else: raise Exception("The sparse matrix doesn't represent an acyclic graph")
[docs] def set_ia(self, ia_dict): self.ia = np.zeros(self.idxs, dtype='float') for term_id in self.terms_dict: if ia_dict.get(term_id): self.ia[self.terms_dict[term_id]['index']] = ia_dict.get(term_id) else: logging.debug('Missing IA for term: {}'.format(term_id)) # Convert inf to zero np.nan_to_num(self.ia, copy=False, nan=0, posinf=0, neginf=0) self.toi_ia = np.nonzero(self.ia > 0)[0]
[docs] class Prediction: """ The score matrix contains the scores given by the predictor for every node of the ontology """ def __init__(self, ids, matrix, namespace=None): self.ids = ids self.matrix = matrix # scores self.namespace = namespace def __str__(self): return "\n".join(["{}\t{}\t{}".format(index, self.matrix[index], self.namespace) for index, _id in enumerate(self.ids)])
[docs] class GroundTruth: def __init__(self, ids, matrix, namespace=None): self.ids = ids self.matrix = matrix self.namespace = namespace
_PROPAGATE_WORK_THRESHOLD = 800_000_000 def _children_cache(ont): children_by_term = getattr(ont, "_children_by_term", None) if children_by_term is None: children_by_term = [ np.flatnonzero(ont.dag[:, term_id]) for term_id in range(ont.dag.shape[1]) ] ont._children_by_term = children_by_term return children_by_term def _propagate_serial(matrix, order_, children_by_term, mode): for term_id in order_: children = children_by_term[term_id] if children.size == 0: continue if mode == "max": child_max = matrix[:, children].max(axis=1) matrix[:, term_id] = np.maximum(matrix[:, term_id], child_max) elif mode == "fill": rows = np.flatnonzero(matrix[:, term_id] == 0) if rows.size: idx = np.ix_(rows, children) matrix[rows, term_id] = matrix[idx].max(axis=1) def _ancestors_csr(ont): """Lazy per-ontology cache of transitive ancestors (self inclusive). Returns ``(indptr, indices)`` flat arrays with ``ancestors[t] == indices[indptr[t]:indptr[t+1]]``. Each list contains term ``t`` itself plus every term reachable by walking DAG parent edges up to the roots. Computed once per ``Graph`` instance. """ cached = getattr(ont, "_ancestors_csr", None) if cached is not None: return cached dag = ont.dag n = int(dag.shape[0]) # Vectorised build of parents_by_term: one full np.nonzero scan instead # of n_terms per-row flatnonzero calls. ``dag[i, j] == 1`` means ``i`` # is_a ``j``, so the non-zero rows of ``dag[t, :]`` are the parents of # ``t``. Grouping by child row gives us the per-term parent list. edge_rows, edge_cols = np.nonzero(dag) sort_idx = np.argsort(edge_rows, kind='stable') sorted_rows = edge_rows[sort_idx] sorted_cols = edge_cols[sort_idx] p_counts = np.bincount(sorted_rows, minlength=n) p_indptr = np.zeros(n + 1, dtype=np.int64) np.cumsum(p_counts, out=p_indptr[1:]) # ont.order is leaves → roots; reverse so parents are always finished # before we look them up. top_down = np.asarray(ont.order)[::-1] # Use Python sets during construction, flatten to arrays afterwards. ancestors = [None] * n for t in top_down: t_int = int(t) s = {t_int} parents_slice = sorted_cols[p_indptr[t_int]:p_indptr[t_int + 1]] for p in parents_slice: s.update(ancestors[int(p)]) ancestors[t_int] = s lens = np.fromiter((len(ancestors[t]) for t in range(n)), dtype=np.int64, count=n) indptr = np.zeros(n + 1, dtype=np.int64) np.cumsum(lens, out=indptr[1:]) total = int(indptr[-1]) indices = np.empty(total, dtype=np.int64) for t in range(n): a = ancestors[t] if a: indices[indptr[t]:indptr[t + 1]] = np.fromiter(a, dtype=np.int64, count=len(a)) ont._ancestors_csr = (indptr, indices) return indptr, indices def _propagate_sparse_pushup(matrix, ont, mode, triples=None): """Sparse alternative to ``_propagate_serial``. Scatters each input non-zero ``(row, col, score)`` into every ancestor of ``col`` (self inclusive), reduces by ``(row, ancestor)`` via a stable sort + ``np.maximum.reduceat`` group-max, and writes the reduced values back in-place. Cost is ``O(nnz * avg_ancestors + R log R)`` where ``R`` is the expanded triple count — on typical CAFA-shaped inputs this beats the per-term dense sweep by 1-2 orders of magnitude because only predicted terms contribute work. If ``triples`` is ``(nz_rows, nz_cols, nz_scores)`` the caller already knows the input non-zero positions (e.g. the parser just scattered them into ``matrix``) and we skip the dense ``np.nonzero`` scan. This is a large win for ground-truth matrices where the non-zero density is ~1e-4 and ``np.nonzero`` would otherwise touch every cell. For ``mode='fill'`` the update semantics are "only overwrite cells that were originally zero"; this is restored by snapshotting the input non-zero positions and writing their original values back at the end. """ n_prot, n_terms = matrix.shape if n_prot == 0 or n_terms == 0: return if triples is not None: nz_rows, nz_cols, nz_scores = triples else: nz_rows, nz_cols = np.nonzero(matrix) if nz_rows.size == 0: return nz_scores = matrix[nz_rows, nz_cols] if nz_rows.size == 0: return indptr, anc_indices = _ancestors_csr(ont) n_anc = (indptr[nz_cols + 1] - indptr[nz_cols]).astype(np.int64) total = int(n_anc.sum()) if total == 0: return # Vectorised gather of the per-non-zero ancestor slices into one flat # array. Equivalent to concatenating ``anc_indices[indptr[c]:indptr[c+1]]`` # for every non-zero column ``c``, but without a Python loop. # block_starts[i] = indptr[nz_cols[i]] # global_offsets[i] = sum(n_anc[:i]) # local_offset[j] = j - global_offsets[ block(j) ] # anc_ptr[j] = block_starts[ block(j) ] + local_offset[j] block_starts = indptr[nz_cols] global_offsets = np.zeros(nz_rows.size + 1, dtype=np.int64) np.cumsum(n_anc, out=global_offsets[1:]) base_per_j = np.repeat(block_starts, n_anc) local_offset = np.arange(total, dtype=np.int64) - np.repeat(global_offsets[:-1], n_anc) expanded_cols = anc_indices[base_per_j + local_offset] expanded_rows = np.repeat(nz_rows, n_anc) expanded_scores = np.repeat(nz_scores, n_anc) # Group-max over (row, ancestor). Use a flat key so a single argsort # gives the grouping we need. flat = expanded_rows.astype(np.int64) * n_terms + expanded_cols order_idx = np.argsort(flat, kind='stable') flat_s = flat[order_idx] scores_s = expanded_scores[order_idx] group_starts = np.empty(flat_s.size, dtype=bool) group_starts[0] = True np.not_equal(flat_s[1:], flat_s[:-1], out=group_starts[1:]) start_idx = np.flatnonzero(group_starts) group_max = np.maximum.reduceat(scores_s, start_idx) unique_flat = flat_s[start_idx] out_rows = unique_flat // n_terms out_cols = unique_flat % n_terms # Max against what is currently in the matrix. For mode='max' this is the # final answer; for mode='fill' we restore original non-zeros below. current = matrix[out_rows, out_cols] np.maximum(current, group_max, out=current) matrix[out_rows, out_cols] = current if mode == "fill": # Only zero cells may be filled by propagation; originally non-zero # cells keep their input value regardless of what descendants say. matrix[nz_rows, nz_cols] = nz_scores
[docs] def propagate(matrix, ont, order, mode="max", parallel=0, chunk_rows=65536, _shm_name=None, _shape=None, _dtype_str=None, _row_start=None, _row_end=None, _deepest=None, _triples=None): """ Update inplace the score matrix (proteins x terms) propagating scores up to the root. ``mode='max'`` takes the max of each term and its children; ``mode='fill'`` only updates rows where the current term is zero. When ``parallel > 1`` and the estimated work is above the threshold the matrix is shared across processes via ``shared_memory`` (spawn context) and rows are partitioned among workers. Recursive calls re-enter this function through the ``_shm_name`` path. """ import multiprocessing as mp from multiprocessing import shared_memory if _shm_name is None: if matrix is None: raise TypeError("matrix must not be None") if matrix.shape[0] == 0: raise Exception("Empty matrix") use_sparse = os.environ.get("CAFAEVAL_SPARSE", "1") not in ("0", "false", "False") if use_sparse: _propagate_logger.debug( "propagate sparse pushup", extra={"mode": mode, "rows": int(matrix.shape[0])}, ) _propagate_sparse_pushup(matrix, ont, mode, triples=_triples) return # Dense fallback: compute ``deepest`` so the serial sweep can skip # the all-zero columns at the front of ``order``. The sparse path # does not need this because ``_ancestors_csr`` is order-independent. has_any = np.any(matrix[:, order] != 0, axis=0) nonzero_idx = np.flatnonzero(has_any) if nonzero_idx.size == 0: raise Exception("The matrix is empty") deepest = int(nonzero_idx[0]) order_ = order[deepest:] # Dense fallback path still needs the per-term children list. children_by_term = _children_cache(ont) n_proc = int(parallel) if parallel else 0 if n_proc > 1: sum_children = int(sum(children_by_term[t].size for t in order_)) work = int(matrix.shape[0]) * sum_children if work < _PROPAGATE_WORK_THRESHOLD: _propagate_logger.info( "propagate serial (below threshold)", extra={"work": work, "threshold": _PROPAGATE_WORK_THRESHOLD, "n_proc_requested": n_proc, "mode": mode}, ) n_proc = 0 else: _propagate_logger.info( "propagate parallel", extra={"work": work, "n_proc": n_proc, "mode": mode, "rows": int(matrix.shape[0])}, ) if n_proc <= 1: _propagate_logger.debug( "propagate serial", extra={"mode": mode, "rows": int(matrix.shape[0]), "terms": int(len(order_))}, ) _propagate_serial(matrix, order_, children_by_term, mode) return shm = shared_memory.SharedMemory(create=True, size=matrix.nbytes) shm_matrix = np.ndarray(matrix.shape, dtype=matrix.dtype, buffer=shm.buf) shm_matrix[:] = matrix try: n_rows = int(matrix.shape[0]) chunk_rows = int(np.ceil(n_rows / n_proc)) _propagate_logger.debug( "propagate chunking", extra={"n_rows": n_rows, "chunk_rows": chunk_rows, "n_proc": n_proc}, ) ctx = mp.get_context("spawn") procs = [] for row_start in range(0, n_rows, chunk_rows): row_end = min(n_rows, row_start + chunk_rows) proc = ctx.Process( target=propagate, args=(None, ont, order), kwargs={ "mode": mode, "parallel": 0, "chunk_rows": chunk_rows, "_shm_name": shm.name, "_shape": matrix.shape, "_dtype_str": matrix.dtype.str, "_row_start": row_start, "_row_end": row_end, "_deepest": deepest, }, ) proc.start() procs.append(proc) for proc in procs: proc.join() if proc.exitcode != 0: raise RuntimeError("Worker failed") matrix[:] = shm_matrix finally: shm.close() shm.unlink() return shm = shared_memory.SharedMemory(name=_shm_name) try: full = np.ndarray(tuple(_shape), dtype=np.dtype(_dtype_str), buffer=shm.buf) row_start = int(_row_start) row_end = int(_row_end) view = full[row_start:row_end] if view.shape[0] == 0: return deepest = int(_deepest) if _deepest is not None else 0 order_ = order[deepest:] children_by_term = _children_cache(ont) _propagate_serial(view, order_, children_by_term, mode) finally: shm.close() return