Source code for cafaeval.evaluation

import os

import time
import numpy as np
import pandas as pd
import multiprocessing as mp
from cafaeval.parser import obo_parser, gt_parser, pred_parser, gt_exclude_parser, update_toi
from cafaeval.tests import test_norm_metric, test_intersection
import logging

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
_metrics_logger = logging.getLogger("cafaeval.metrics")
_metrics_logger.addHandler(logging.NullHandler())
_eval_logger = logging.getLogger("cafaeval.eval")
_eval_logger.addHandler(logging.NullHandler())

# Return a mask for all the predictions (matrix) >= tau
[docs] def solidify_prediction(pred, tau): return pred >= tau
# computes the f metric for each precision and recall in the input arrays
[docs] def compute_f(pr, rc): n = 2 * pr * rc d = pr + rc return np.divide(n, d, out=np.zeros_like(n, dtype=float), where=d != 0)
[docs] def compute_s(ru, mi): return np.sqrt(ru**2 + mi**2)
# return np.where(np.isnan(ru), mi, np.sqrt(ru + np.nan_to_num(mi)))
[docs] def compute_confusion_matrix(tau_arr, g, pred_matrix, toi, n_gt, ic_arr=None): """ Perform the evaluation at the matrix level for all tau thresholds The calculation is """ # n, tp, fp, fn, pr, rc (fp = misinformation, fn = remaining uncertainty) metrics = np.zeros((len(tau_arr), 6), dtype='float') for i, tau in enumerate(tau_arr): # Filter predictions based on tau threshold p = solidify_prediction(pred_matrix, tau) # Terms subsets intersection = np.logical_and(p, g) # TP mis = np.logical_and(p, np.logical_not(g)) # FP, predicted but not in the ground truth remaining = np.logical_and(np.logical_not(p), g) # FN, not predicted but in the ground truth # Weighted evaluation if ic_arr is not None: p = p * ic_arr[toi] intersection = intersection * ic_arr[toi] # TP mis = mis * ic_arr[toi] # FP, predicted but not in the ground truth remaining = remaining * ic_arr[toi] # FN, not predicted but in the ground truth n_pred = p.sum(axis=1) # TP + FP (number of terms predicted in each protein) n_intersection = intersection.sum(axis=1) # TP (number of TP terms per protein) # Number of proteins with at least one term predicted with score >= tau metrics[i, 0] = (p.sum(axis=1) > 0).sum() # Sum of confusion matrices metrics[i, 1] = n_intersection.sum() # TP (total terms) metrics[i, 2] = mis.sum(axis=1).sum() # FP metrics[i, 3] = remaining.sum(axis=1).sum() # FN # Macro-averaging metrics[i, 4] = np.divide(n_intersection, n_pred, out=np.zeros_like(n_intersection, dtype='float'), where=n_pred > 0).sum() # Precision metrics[i, 5] = np.divide(n_intersection, n_gt, out=np.zeros_like(n_gt, dtype='float'), where=n_gt > 0).sum() # Recall return metrics
[docs] def compute_confusion_matrix_exclude(tau_arr, g_perprotein, pred_matrix, toi_perprotein, n_gt, ic_arr=None): """ Perform the evaluation at the matrix level for all tau thresholds The calculation is Here, g is the full ground truth matrix without filtering terms of interest (toi). Instead, """ # n, tp, fp, fn, pr, rc (fp = misinformation, fn = remaining uncertainty) metrics = np.zeros((len(tau_arr), 6), dtype='float') for i, tau in enumerate(tau_arr): # Filter predictions based on tau threshold p_perprotein = [solidify_prediction(pred_matrix[p_idx, tois], tau) for p_idx, tois in enumerate(toi_perprotein)] # Terms subsets intersection = [np.logical_and(p_i, g_i) for p_i, g_i in zip(p_perprotein, g_perprotein)] # TP mis = [np.logical_and(p_i, np.logical_not(g_i)) for p_i, g_i in zip(p_perprotein, g_perprotein)] # FP, predicted but not in the ground truth remaining = [np.logical_and(np.logical_not(p_i), g_i) for p_i, g_i in zip(p_perprotein, g_perprotein)] # FN, not predicted but in the ground truth # Weighted evaluation if ic_arr is not None: p_perprotein = [p_i * ic_arr[tois] for p_i, tois in zip(p_perprotein, toi_perprotein)] intersection = [inter * ic_arr[tois] for inter, tois in zip(intersection, toi_perprotein)] # TP mis = [misinf * ic_arr[tois] for misinf, tois in zip(mis, toi_perprotein)] # FP, predicted but not in the ground truth remaining = [rem * ic_arr[tois] for rem, tois in zip(remaining, toi_perprotein)] # FN, not predicted but in the ground truth n_pred = np.array([p_i.sum() for p_i in p_perprotein]) # TP + FP n_intersection = np.array([inter.sum() for inter in intersection]) # TP precision = np.divide(n_intersection, n_pred, out=np.zeros_like(n_intersection, dtype='float'), where=n_pred > 0) recall = np.divide(n_intersection, n_gt, out=np.zeros_like(n_gt, dtype='float'), where=n_gt > 0) # metrics tests test_norm_metric(precision, name='precision') test_norm_metric(recall, name='recall') test_intersection(n_intersection, n_pred, n_gt) # Number of proteins with at least one term predicted with score >= tau metrics[i, 0] = (n_pred > 0).sum() # Sum of confusion matrices metrics[i, 1] = n_intersection.sum() # TP metrics[i, 2] = np.sum([m.sum() for m in mis]) # FP metrics[i, 3] = np.sum([r.sum() for r in remaining]) # FN # Macro-averaging metrics[i, 4] = precision.sum() # Precision metrics[i, 5] = recall.sum() # Recall return metrics
[docs] def compute_confusion_matrix_sparse(tau_arr, g, pred_matrix, toi, n_gt, ic_arr=None): """Sparse alternative to :func:`compute_confusion_matrix`. Scatters each non-zero prediction into the highest tau bin at which it is still active, then recovers per-threshold per-protein sums via a single right-to-left cumulative sum. Total cost is O(nnz + n_prot * n_tau) instead of O(n_tau * n_prot * n_toi) for the dense scan, which is the dominant win on real-world corpora where the prediction matrix is wide and mostly zero above typical thresholds. Accepts the same inputs as ``compute_confusion_matrix`` and returns a ``(n_tau, 6)`` array with the same column order (n, tp, fp, fn, pr, rc). ``tau_arr`` must be sorted in ascending order, which is already the case for all call sites (``np.arange(th_step, 1, th_step)``). """ n_prot, n_toi = pred_matrix.shape n_tau = len(tau_arr) metrics = np.zeros((n_tau, 6), dtype='float') # Weight vector over the TOI columns. None → unweighted (all ones, cheap). if ic_arr is None: w_toi = None else: w_toi = ic_arr[toi].astype(np.float64, copy=False) # Total gt weight (per protein) is already in n_gt for both modes; # we only need its sum to fill the FN column. total_gt = float(n_gt.sum()) nz_rows, nz_cols = np.nonzero(pred_matrix) if nz_rows.size == 0: metrics[:, 3] = total_gt return metrics nz_scores = pred_matrix[nz_rows, nz_cols] nz_is_tp = g[nz_rows, nz_cols].astype(np.float64, copy=False) # Highest tau index at which a pred with this score is still active. # A pred is active at tau_arr[k] iff k <= last_idx. last_idx = np.searchsorted(tau_arr, nz_scores, side='right') - 1 if last_idx.min() < 0: keep = last_idx >= 0 last_idx = last_idx[keep] nz_rows = nz_rows[keep] nz_cols = nz_cols[keep] nz_is_tp = nz_is_tp[keep] if last_idx.size == 0: metrics[:, 3] = total_gt return metrics # Flat scatter via bincount (much faster than np.add.at for large nnz). flat = nz_rows.astype(np.int64, copy=False) * n_tau + last_idx.astype(np.int64, copy=False) length = n_prot * n_tau if w_toi is None: w_pred = np.ones_like(nz_is_tp) else: w_pred = w_toi[nz_cols] w_tp = w_pred * nz_is_tp delta_pred_w = np.bincount(flat, weights=w_pred, minlength=length).reshape(n_prot, n_tau) delta_tp_w = np.bincount(flat, weights=w_tp, minlength=length).reshape(n_prot, n_tau) # Right-to-left cumulative sum: active weight at tau index k is the sum of # contributions of all bins >= k (a pred dies out at the first k > last_idx). pred_at_tau = np.cumsum(delta_pred_w[:, ::-1], axis=1)[:, ::-1] tp_at_tau = np.cumsum(delta_tp_w[:, ::-1], axis=1)[:, ::-1] # Per-threshold aggregation. Matches the column order of the dense kernel. tp_totals = tp_at_tau.sum(axis=0) pred_totals = pred_at_tau.sum(axis=0) metrics[:, 0] = (pred_at_tau > 0).sum(axis=0) metrics[:, 1] = tp_totals metrics[:, 2] = pred_totals - tp_totals metrics[:, 3] = total_gt - tp_totals # Macro precision / recall. Guard divides by zero to match the dense kernel # (which uses np.divide with where=denom > 0, returning 0 elsewhere). safe_pred = np.where(pred_at_tau > 0, pred_at_tau, 1.0) precision = np.where(pred_at_tau > 0, tp_at_tau / safe_pred, 0.0) metrics[:, 4] = precision.sum(axis=0) if np.any(n_gt > 0): n_gt_col = n_gt.astype(np.float64, copy=False)[:, None] safe_gt = np.where(n_gt_col > 0, n_gt_col, 1.0) recall = np.where(n_gt_col > 0, tp_at_tau / safe_gt, 0.0) metrics[:, 5] = recall.sum(axis=0) return metrics
[docs] def compute_confusion_matrix_exclude_sparse( tau_arr, pred_sub, gt_sub, toi_mask, excluded_mask, n_gt, ic_arr=None ): """Sparse alternative to :func:`compute_confusion_matrix_exclude`. Same single-pass scatter + right-to-left cumsum strategy as :func:`compute_confusion_matrix_sparse`, extended to the PK setting where the valid column set is protein-specific. A prediction at ``(row, col)`` contributes to protein ``row`` only if: 1. ``col`` belongs to the global toi (``toi_mask``), and 2. ``col`` is not in this protein's exclude set (``~excluded_mask``). Because the filter is expressed as a boolean AND over dense matrices, the sparse scatter sees only the surviving non-zeros — we never materialise the Python list of per-protein toi arrays. Cost drops from ``O(n_tau * sum_p |toi_p|)`` to ``O(nnz_valid + n_prot * n_tau)``. Parameters ---------- pred_sub: ``(n_prot, n_terms)`` float — predictions restricted to proteins with at least one GT annotation in the toi. gt_sub: ``(n_prot, n_terms)`` bool/int — ground truth for the same proteins. toi_mask: ``(n_terms,) bool`` — global terms-of-interest flag. excluded_mask: ``(n_prot, n_terms) bool`` — per-protein exclude flag. n_gt: ``(n_prot,) float`` — pre-computed weight of valid GT annotations per protein (dense sum over ``gt_sub & toi_mask & ~excluded_mask``, optionally multiplied by ``ic_arr``). ic_arr: ``(n_terms,) float`` or ``None`` — optional per-term weight. """ n_prot, n_terms = pred_sub.shape n_tau = len(tau_arr) metrics = np.zeros((n_tau, 6), dtype='float') total_gt = float(n_gt.sum()) # Only predictions at valid columns contribute. ``pred_sub != 0`` is the # sparsity filter; the two mask ANDs apply the PK exclusion. valid = (pred_sub != 0) & toi_mask[None, :] & (~excluded_mask) nz_rows, nz_cols = np.nonzero(valid) if nz_rows.size == 0: metrics[:, 3] = total_gt return metrics nz_scores = pred_sub[nz_rows, nz_cols] nz_is_tp = gt_sub[nz_rows, nz_cols].astype(np.float64, copy=False) last_idx = np.searchsorted(tau_arr, nz_scores, side='right') - 1 if last_idx.min() < 0: keep = last_idx >= 0 last_idx = last_idx[keep] nz_rows = nz_rows[keep] nz_cols = nz_cols[keep] nz_is_tp = nz_is_tp[keep] if last_idx.size == 0: metrics[:, 3] = total_gt return metrics flat = nz_rows.astype(np.int64, copy=False) * n_tau + last_idx.astype(np.int64, copy=False) length = n_prot * n_tau if ic_arr is None: w_pred = np.ones_like(nz_is_tp) else: w_pred = ic_arr[nz_cols].astype(np.float64, copy=False) w_tp = w_pred * nz_is_tp delta_pred_w = np.bincount(flat, weights=w_pred, minlength=length).reshape(n_prot, n_tau) delta_tp_w = np.bincount(flat, weights=w_tp, minlength=length).reshape(n_prot, n_tau) pred_at_tau = np.cumsum(delta_pred_w[:, ::-1], axis=1)[:, ::-1] tp_at_tau = np.cumsum(delta_tp_w[:, ::-1], axis=1)[:, ::-1] tp_totals = tp_at_tau.sum(axis=0) pred_totals = pred_at_tau.sum(axis=0) # For coverage (`n` column) to match the denominator used by normalize() # — which is the post-exclusion protein count from _count_proteins_in_toi # — restrict the row count to proteins that still have ≥1 GT annotation # in TOI after the per-protein exclude mask. Without this, `n` counts # proteins whose TOI annotations were all already known in t0, while the # denominator drops them, producing coverage > 1 in PK. eligible_rows = ((gt_sub != 0) & toi_mask[None, :] & (~excluded_mask)).any(axis=1) metrics[:, 0] = ((pred_at_tau > 0) & eligible_rows[:, None]).sum(axis=0) metrics[:, 1] = tp_totals metrics[:, 2] = pred_totals - tp_totals metrics[:, 3] = total_gt - tp_totals safe_pred = np.where(pred_at_tau > 0, pred_at_tau, 1.0) precision = np.where(pred_at_tau > 0, tp_at_tau / safe_pred, 0.0) metrics[:, 4] = precision.sum(axis=0) if np.any(n_gt > 0): n_gt_col = n_gt.astype(np.float64, copy=False)[:, None] safe_gt = np.where(n_gt_col > 0, n_gt_col, 1.0) recall = np.where(n_gt_col > 0, tp_at_tau / safe_gt, 0.0) metrics[:, 5] = recall.sum(axis=0) return metrics
_CM_G = None _CM_P = None _CM_TOI = None _CM_N_GT = None _CM_IC = None def _cm_init(g, p, toi, n_gt, ic_arr): global _CM_G, _CM_P, _CM_TOI, _CM_N_GT, _CM_IC _CM_G = g _CM_P = p _CM_TOI = toi _CM_N_GT = n_gt _CM_IC = ic_arr def _cm_worker(tau_chunk): return compute_confusion_matrix( tau_chunk, _CM_G, _CM_P, _CM_TOI, _CM_N_GT, _CM_IC ) _CME_G = None _CME_P = None _CME_TOI = None _CME_N_GT = None _CME_IC = None def _cme_init(g_perprotein, p, toi_perprotein, n_gt, ic_arr): global _CME_G, _CME_P, _CME_TOI, _CME_N_GT, _CME_IC _CME_G = g_perprotein _CME_P = p _CME_TOI = toi_perprotein _CME_N_GT = n_gt _CME_IC = ic_arr def _cme_worker(tau_chunk): return compute_confusion_matrix_exclude( tau_chunk, _CME_G, _CME_P, _CME_TOI, _CME_N_GT, _CME_IC )
[docs] def compute_metrics(pred, gt_matrix, tau_arr, toi, gt_exclude=None, ic_arr=None, n_cpu=0): """ Takes the prediction and the ground truth and for each threshold in tau_arr calculates the confusion matrix and returns the coverage, precision, recall, remaining uncertainty and misinformation. Toi is the list of terms (indexes) to be considered """ t0 = time.time() # Parallelization if n_cpu == 0: n_cpu = mp.cpu_count() mode = "pk" if gt_exclude is not None else "nk_lk" _metrics_logger.debug( "compute_metrics start", extra={"n_cpu": n_cpu, "n_tau": int(len(tau_arr)), "mode": mode}, ) columns = ["n", "tp", "fp", "fn", "pr", "rc"] use_sparse = os.environ.get("CAFAEVAL_SPARSE", "1") not in ("0", "false", "False") n_terms = gt_matrix.shape[1] toi_is_full = ( isinstance(toi, np.ndarray) and toi.ndim == 1 and toi.size == n_terms ) # Detecting the common "toi == arange(n_terms)" case lets us skip a full # ``(n_prot, n_terms)`` column copy on every cafa_eval call. We only pay # the cheap stride check when toi already has the right shape; the # equality probe is bounded by a small constant via ``array_equal``. if toi_is_full: toi_is_full = bool(np.array_equal(toi, np.arange(n_terms))) # Filter out proteins with no annotations in Terms-Of-Interest. ``any`` # short-circuits per row on the bool result so it beats ``sum > 0`` on # large dense matrices, especially when toi spans the whole ontology. if toi_is_full: proteins_has_gt = (gt_matrix != 0).any(axis=1) else: proteins_has_gt = (gt_matrix[:, toi] != 0).any(axis=1) proteins_with_gt = np.where(proteins_has_gt)[0] # Heavy slices ``g``/``p``/``gt_with_annots``/``pred_sub`` are deferred to # the branch that actually consumes them. The PK sparse kernel only needs # ``pred_sub`` + ``gt_with_annots`` + masks; the NK sparse kernel only # needs ``g`` + ``p``. Materialising both up front cost ~3 s per call on # the BP namespace. g = None p = None gt_with_annots = None toi_mask = None excluded_mask = None toi_perprotein = None gt_perprotein = None if gt_exclude is not None: if proteins_has_gt.all(): gt_with_annots = gt_matrix else: gt_with_annots = gt_matrix[proteins_with_gt, :] if use_sparse: # Dense boolean masks replace the per-protein Python list of toi # arrays. Each protein's effective toi is the global toi minus # the terms flagged in ``gt_exclude``; per-protein GT weight is # the sum over that intersection. if toi_is_full: toi_mask = np.ones(n_terms, dtype=bool) else: toi_mask = np.zeros(n_terms, dtype=bool) toi_mask[toi] = True # gt_exclude.matrix is already bool (built by gt_parser / # gt_exclude_parser via ``np.zeros_like(gt.matrix)`` where # ``gt.matrix.dtype == bool``), so the legacy ``!= 0`` was a # redundant elementwise compare on top of the fancy-index copy. excluded_mask = gt_exclude.matrix[proteins_with_gt, :] # ``n_gt`` only needs the count of GT cells that survive the # ``toi_mask & ~excluded_mask`` filter. Materialising the full # ``valid_gt_mask`` (three dense bool ops on # ``(n_prot, n_terms)``) costs ~300 ms on the BP namespace # — almost as much as the kernel itself. Instead we walk the # GT non-zero coordinates once and accumulate via ``bincount``. gt_nz_rows, gt_nz_cols = np.nonzero(gt_with_annots) if gt_nz_rows.size: if not toi_is_full: keep = toi_mask[gt_nz_cols] gt_nz_rows = gt_nz_rows[keep] gt_nz_cols = gt_nz_cols[keep] keep = ~excluded_mask[gt_nz_rows, gt_nz_cols] gt_nz_rows = gt_nz_rows[keep] gt_nz_cols = gt_nz_cols[keep] n_prot_with_gt = gt_with_annots.shape[0] if ic_arr is None: n_gt = np.bincount( gt_nz_rows, minlength=n_prot_with_gt, ).astype(np.float64) else: n_gt = np.bincount( gt_nz_rows, weights=ic_arr[gt_nz_cols], minlength=n_prot_with_gt, ).astype(np.float64) else: # Dense fallback: preserve the exact list-based summation order # used by upstream so test_norm_metric inside the dense kernel # does not trip on ULP noise. toi_perprotein = [ np.setdiff1d(toi, gt_exclude.matrix[prot, :].nonzero()[0], assume_unique=True) for prot in proteins_with_gt ] gt_perprotein = [ gt_with_annots[p_idx, tois] for p_idx, tois in enumerate(toi_perprotein) ] n_gt = np.array([gpp.sum().item() for gpp in gt_perprotein]) if ic_arr is not None: n_gt = np.array([(gpp * ic_arr[tois]).sum().item() for gpp, tois in zip(gt_perprotein, toi_perprotein)]) n_empty = int(np.count_nonzero(n_gt == 0)) if n_empty: _metrics_logger.warning( f"proteins with no annotations in TOI: {n_empty}", extra={"count": n_empty}, ) else: # NK / LK path — needs g and p. if proteins_has_gt.all(): gt_with_annots = gt_matrix pred_filtered = pred else: gt_with_annots = gt_matrix[proteins_with_gt, :] pred_filtered = pred[proteins_has_gt, :] if toi_is_full: g = gt_with_annots p = pred_filtered else: g = gt_with_annots[:, toi] p = pred_filtered[:, toi] # Simple metrics: number of terms annotated in each protein if ic_arr is None: n_gt = g.sum(axis=1) # Weighted metrics else: n_gt = (g * ic_arr[toi]).sum(axis=1) t1 = time.time() if gt_exclude is None and use_sparse: # Sparse kernel: single-pass scatter + cumsum. No pool needed — the # kernel is fast enough that fork overhead would dominate. metrics = compute_confusion_matrix_sparse(tau_arr, g, p, toi, n_gt, ic_arr) elif gt_exclude is None: tau_chunks = np.array_split(tau_arr, n_cpu) ctx = mp.get_context("fork") with ctx.Pool(processes=n_cpu, initializer=_cm_init, initargs=(g, p, toi, n_gt, ic_arr)) as pool: metrics = np.concatenate(pool.map(_cm_worker, tau_chunks), axis=0) elif use_sparse: # PK sparse kernel: boolean-masked single-pass scatter. The toi and # exclude masks live as dense ``(n_prot, n_terms)`` bools, so the # scatter never needs a Python-level per-protein loop. if proteins_has_gt.all(): pred_sub = pred else: pred_sub = pred[proteins_has_gt, :] metrics = compute_confusion_matrix_exclude_sparse( tau_arr, pred_sub, gt_with_annots, toi_mask, excluded_mask, n_gt, ic_arr ) else: # Dense fallback: fan out tau chunks over a fork pool using the # per-protein toi / gt lists we prepared above. Gated by # ``CAFAEVAL_SPARSE=0`` for A/B comparison against the sparse kernel. tau_chunks = np.array_split(tau_arr, n_cpu) if proteins_has_gt.all(): pred_sub = pred else: pred_sub = pred[proteins_has_gt, :] ctx = mp.get_context("fork") with ctx.Pool(processes=n_cpu, initializer=_cme_init, initargs=(gt_perprotein, pred_sub, toi_perprotein, n_gt, ic_arr)) as pool: metrics = np.concatenate(pool.map(_cme_worker, tau_chunks), axis=0) t2 = time.time() kernel = "sparse" if use_sparse else f"pool({n_cpu})" _metrics_logger.info( f"compute_metrics {mode} {kernel} n_tau={len(tau_arr)} " f"prep={t1 - t0:.3f}s kernel={t2 - t1:.3f}s total={t2 - t0:.3f}s", extra={"mode": mode, "n_cpu": n_cpu, "n_tau": int(len(tau_arr)), "kernel": kernel, "prep_seconds": round(t1 - t0, 3), "pool_seconds": round(t2 - t1, 3), "total_seconds": round(t2 - t0, 3)}, ) return pd.DataFrame(metrics, columns=columns)
[docs] def normalize(metrics, ns, tau_arr, ne, normalization): # Normalize columns for column in metrics.columns: if column != "n": # By default normalize by gt denominator = ne # Otherwise normalize by pred if normalization == 'pred' or (normalization == 'cafa' and column == "pr"): denominator = metrics["n"] metrics[column] = np.divide(metrics[column], denominator, out=np.zeros_like(metrics[column], dtype='float'), where=denominator > 0) metrics['ns'] = [ns] * len(tau_arr) metrics['tau'] = tau_arr metrics['cov'] = metrics['n'] / ne metrics['mi'] = metrics['fp'] metrics['ru'] = metrics['fn'] metrics['f'] = compute_f(metrics['pr'], metrics['rc']) metrics['s'] = compute_s(metrics['ru'], metrics['mi']) # Micro-average, calculation is based on the average of the confusion matrices metrics['pr_micro'] = np.divide(metrics['tp'], metrics['tp'] + metrics['fp'], out=np.zeros_like(metrics['tp'], dtype='float'), where=(metrics['tp'] + metrics['fp']) > 0) metrics['rc_micro'] = np.divide(metrics['tp'], metrics['tp'] + metrics['fn'], out=np.zeros_like(metrics['tp'], dtype='float'), where=(metrics['tp'] + metrics['fn']) > 0) metrics['f_micro'] = compute_f(metrics['pr_micro'], metrics['rc_micro']) return metrics
def _count_proteins_in_toi(gt_matrix, toi, exclude_matrix): """Number of proteins with at least one GT annotation in the toi. Vectorised replacement for the legacy ``sum(...)`` over a Python list comprehension that called ``setdiff1d`` per protein. On the BP namespace of a real PK corpus this drops the ``evaluate_prediction`` per-namespace overhead from ~1 s to a few ms. When ``exclude_matrix`` is ``None`` (NK/LK) the count is just the number of rows that have any non-zero in the toi columns. When it is provided (PK) the count is the number of rows that still have a surviving annotation after the per-protein exclude set is removed. """ n_terms = gt_matrix.shape[1] toi_is_full = ( isinstance(toi, np.ndarray) and toi.ndim == 1 and toi.size == n_terms and bool(np.array_equal(toi, np.arange(n_terms))) ) if exclude_matrix is None: if toi_is_full: has_any = (gt_matrix != 0).any(axis=1) else: has_any = (gt_matrix[:, toi] != 0).any(axis=1) return int(has_any.sum()) if toi_is_full: toi_mask = np.ones(n_terms, dtype=bool) else: toi_mask = np.zeros(n_terms, dtype=bool) toi_mask[toi] = True valid = (gt_matrix != 0) & toi_mask[None, :] & (exclude_matrix == 0) return int(valid.any(axis=1).sum())
[docs] def evaluate_prediction(prediction, gt, ontologies, tau_arr, gt_exclude=None, normalization='cafa', n_cpu=0, weighted_only=False): t0 = time.time() _eval_logger.debug( "evaluate_prediction start", extra={"n_namespaces": len(prediction), "weighted_only": weighted_only, "normalization": normalization, "n_cpu": n_cpu}, ) dfs = [] dfs_w = [] # Unweighted metrics for ns in prediction: t_ns = time.time() # Number of proteins with positive annotations in the TOI. The PK # branch (gt_exclude not None) further restricts to proteins that # still have a surviving annotation after the per-protein exclude # set is removed. Both shapes are computed via a single vectorised # mask AND — the legacy code did this with a Python list # comprehension over ``proteins_with_gt``, which alone cost ~1 s on # the BP namespace. num_annot_prots = _count_proteins_in_toi( gt[ns].matrix, ontologies[ns].toi, gt_exclude[ns].matrix if gt_exclude is not None else None, ) exclude = gt_exclude[ns] if gt_exclude is not None else None ne = np.full(len(tau_arr), num_annot_prots) if not weighted_only: t_metrics = time.time() metrics_df = compute_metrics( prediction[ns].matrix, gt[ns].matrix, tau_arr, ontologies[ns].toi, exclude, None, n_cpu) t_norm = time.time() dfs.append(normalize(metrics_df, ns, tau_arr, ne, normalization)) _eval_logger.info( f"evaluate {ns:>18s} unweighted metrics={t_norm - t_metrics:.3f}s " f"norm={time.time() - t_norm:.3f}s", extra={"ns": ns, "variant": "unweighted", "metrics_seconds": round(t_norm - t_metrics, 3), "normalize_seconds": round(time.time() - t_norm, 3), "total_seconds": round(time.time() - t_ns, 3)}, ) # Weighted metrics if ontologies[ns].ia is not None: t_w = time.time() num_annot_prots = _count_proteins_in_toi( gt[ns].matrix, ontologies[ns].toi_ia, gt_exclude[ns].matrix if gt_exclude is not None else None, ) exclude = gt_exclude[ns] if gt_exclude is not None else None ne = np.full(len(tau_arr), num_annot_prots) t_metrics_w = time.time() metrics_df_w = compute_metrics( prediction[ns].matrix, gt[ns].matrix, tau_arr, ontologies[ns].toi_ia, exclude, ontologies[ns].ia, n_cpu) t_norm_w = time.time() dfs_w.append(normalize(metrics_df_w, ns, tau_arr, ne, normalization)) _eval_logger.info( f"evaluate {ns:>18s} weighted metrics={t_norm_w - t_metrics_w:.3f}s " f"norm={time.time() - t_norm_w:.3f}s", extra={"ns": ns, "variant": "weighted", "metrics_seconds": round(t_norm_w - t_metrics_w, 3), "normalize_seconds": round(time.time() - t_norm_w, 3), "total_seconds": round(time.time() - t_w, 3)}, ) t_merge = time.time() if weighted_only: dfs_w = pd.concat(dfs_w) base_cols = ("ns", "tau") metric_cols = [c for c in dfs_w.columns if c not in base_cols] for c in metric_cols: dfs_w[f"{c}_w"] = dfs_w[c] _eval_logger.info( f"evaluate_prediction done (weighted_only) in {time.time() - t0:.2f}s", extra={"total_seconds": round(time.time() - t0, 3)}, ) return dfs_w dfs = pd.concat(dfs) # Merge weighted and unweighted dataframes if dfs_w: dfs_w = pd.concat(dfs_w) dfs = pd.merge(dfs, dfs_w, on=['ns', 'tau'], suffixes=('', '_w')) _eval_logger.info( f"evaluate_prediction done in {time.time() - t0:.2f}s " f"(merge {time.time() - t_merge:.3f}s)", extra={"total_seconds": round(time.time() - t0, 3), "merge_seconds": round(time.time() - t_merge, 3)}, ) return dfs
[docs] def cafa_eval(obo_file, pred_dir, gt_file, ia=None, no_orphans=False, norm='cafa', prop='max', exclude=None, toi_file=None, max_terms=None, th_step=0.01, n_cpu=1, weighted_only=False): t_total = time.time() _eval_logger.info( f"cafa_eval start: norm={norm} prop={prop} th_step={th_step} " f"n_cpu={n_cpu} weighted_only={weighted_only}", extra={"norm": norm, "prop": prop, "th_step": th_step, "n_cpu": n_cpu, "weighted_only": weighted_only, "obo_file": obo_file, "pred_dir": pred_dir, "gt_file": gt_file, "exclude": exclude, "toi_file": toi_file, "max_terms": max_terms, "ia": ia}, ) # Tau array, used to compute metrics at different score thresholds tau_arr = np.arange(th_step, 1, th_step) # Parse the OBO file and creates a different graphs for each namespace t_obo = time.time() ontologies = obo_parser(obo_file, ("is_a", "part_of"), ia, not no_orphans) if toi_file is not None: ontologies = update_toi(ontologies, toi_file) _eval_logger.info( f"obo parsed: {len(ontologies)} namespaces, n_tau={len(tau_arr)} " f"({time.time() - t_obo:.2f}s)", extra={"seconds": round(time.time() - t_obo, 3), "namespaces": list(ontologies.keys()), "n_tau": int(len(tau_arr))}, ) # Parse ground truth file t_gt = time.time() gt = gt_parser(gt_file, ontologies) if exclude is not None: gt_exclude = gt_exclude_parser(exclude, gt, ontologies) else: gt_exclude = None gt_stats = ", ".join(f"{ns[:2]}={len(g.ids)}" for ns, g in gt.items()) _eval_logger.info( f"ground truth parsed: {gt_stats}, exclude={'yes' if gt_exclude else 'no'} " f"({time.time() - t_gt:.2f}s)", extra={"seconds": round(time.time() - t_gt, 3), "has_exclude": gt_exclude is not None}, ) # Set prediction files looking recursively in the prediction folder pred_folder = os.path.normpath(pred_dir) + "/" # add the tailing "/" pred_files = [] for root, dirs, files in os.walk(pred_folder): for file in files: pred_files.append(os.path.join(root, file)) logger.debug("Prediction paths {}".format(pred_files)) _eval_logger.info( f"prediction files discovered: {len(pred_files)} file(s) under {pred_dir}", extra={"count": len(pred_files), "pred_dir": pred_dir}, ) # Parse prediction files and perform evaluation dfs = [] for idx, file_name in enumerate(pred_files, 1): basename = os.path.basename(file_name) t_file = time.time() t_parse = time.time() prediction = pred_parser(file_name, ontologies, gt, prop, max_terms, n_cpu) if not prediction: logger.warning("Prediction: {}, not evaluated".format(file_name)) _eval_logger.warning( f"prediction skipped: {basename} [{idx}/{len(pred_files)}]", extra={"file": file_name, "index": idx, "total": len(pred_files)}, ) continue _eval_logger.info( f"prediction parsed: {basename} [{idx}/{len(pred_files)}] " f"ns={list(prediction.keys())} ({time.time() - t_parse:.2f}s)", extra={"file": file_name, "index": idx, "total": len(pred_files), "parse_seconds": round(time.time() - t_parse, 3), "namespaces": list(prediction.keys())}, ) t_eval = time.time() df_pred = evaluate_prediction(prediction, gt, ontologies, tau_arr, gt_exclude, normalization=norm, n_cpu=n_cpu, weighted_only=weighted_only) df_pred['filename'] = file_name.replace(pred_folder, '').replace('/', '_') dfs.append(df_pred) _eval_logger.info( f"prediction evaluated: {basename} eval={time.time() - t_eval:.2f}s " f"total={time.time() - t_file:.2f}s", extra={"file": file_name, "index": idx, "total": len(pred_files), "eval_seconds": round(time.time() - t_eval, 3), "file_seconds": round(time.time() - t_file, 3)}, ) logger.info("Prediction: {}, evaluated".format(file_name)) # Concatenate all dataframes and save them t_final = time.time() df = None dfs_best = {} if dfs: df = pd.concat(dfs) # Remove rows with no coverage df = df[df['cov'] > 0].reset_index(drop=True) df.set_index(['filename', 'ns', 'tau'], inplace=True) # Calculate the best index for each namespace and each evaluation metric for metric, cols in [('f', ['rc', 'pr']), ('f_w', ['rc_w', 'pr_w']), ('s', ['ru', 'mi']), ('f_micro', ['rc_micro', 'pr_micro']), ('f_micro_w', ['rc_micro_w', 'pr_micro_w'])]: if metric in df.columns: index_best = df.groupby(level=['filename', 'ns'])[metric].idxmax() if metric in ['f', 'f_w', 'f_micro', 'f_micro_w'] else df.groupby(['filename', 'ns'])[metric].idxmin() df_best = df.loc[index_best] if metric[-2:] != '_w': df_best['cov_max'] = df.reset_index('tau').loc[[ele[:-1] for ele in index_best]].groupby(level=['filename', 'ns'])['cov'].max() else: df_best['cov_max'] = df.reset_index('tau').loc[[ele[:-1] for ele in index_best]].groupby(level=['filename', 'ns'])['cov_w'].max() dfs_best[metric] = df_best _eval_logger.info( f"cafa_eval aggregation done: {len(dfs)} result(s), " f"best=[{', '.join(dfs_best.keys())}] ({time.time() - t_final:.2f}s)", extra={"seconds": round(time.time() - t_final, 3), "n_results": len(dfs), "best_metrics": list(dfs_best.keys())}, ) else: logger.info("No predictions evaluated") _eval_logger.warning("cafa_eval no predictions evaluated") _eval_logger.info( f"cafa_eval done in {time.time() - t_total:.2f}s", extra={"total_seconds": round(time.time() - t_total, 3)}, ) return df, dfs_best
[docs] def write_results(df, dfs_best, out_dir='results', th_step=0.01): # Create output folder here in order to store the log file out_folder = os.path.normpath(out_dir) + "/" if not os.path.isdir(out_folder): os.makedirs(out_folder) # Set the number of decimals to write in the output files based on the threshold step size decimals = int(np.ceil(-np.log10(th_step))) + 1 df.to_csv('{}/evaluation_all.tsv'.format(out_folder), float_format="%.{}f".format(decimals), sep="\t") for metric in dfs_best: dfs_best[metric].to_csv('{}/evaluation_best_{}.tsv'.format(out_folder, metric), float_format="%.{}f".format(decimals), sep="\t")