1
|
|
|
""" |
2
|
|
|
Calculations of concordance between annotations. |
3
|
|
|
""" |
4
|
|
|
import abc |
5
|
|
|
import math |
6
|
|
|
from typing import Sequence, Set, Collection, Tuple, Dict, Generator |
|
|
|
|
7
|
|
|
|
8
|
|
|
import numpy as np |
|
|
|
|
9
|
|
|
import pandas as pd |
|
|
|
|
10
|
|
|
from typeddfs import TypedDfs |
|
|
|
|
11
|
|
|
|
12
|
|
|
from mandos.analysis import AnalysisUtils, SimilarityDf |
|
|
|
|
13
|
|
|
|
14
|
|
|
ConcordanceDf = ( |
15
|
|
|
TypedDfs.typed("ConcordanceDf").require("sample", dtype=int).require("score", dtype=float) |
16
|
|
|
).build() |
17
|
|
|
|
18
|
|
|
|
19
|
|
|
class ConcordanceCalculator(metaclass=abc.ABCMeta): |
|
|
|
|
20
|
|
|
def __init__(self, n_samples: int, seed: int): |
21
|
|
|
self.n_samples = n_samples |
22
|
|
|
self.seed = seed |
23
|
|
|
self.rand = np.random.RandomState(seed) |
24
|
|
|
|
25
|
|
|
def calc(self, phi: SimilarityDf, psi: SimilarityDf) -> ConcordanceDf: |
|
|
|
|
26
|
|
|
if phi.columns.tolist() != psi.columns.tolist(): |
27
|
|
|
raise ValueError( |
28
|
|
|
f"Mismatched compounds: {phi.columns.tolist()} != {psi.columns.tolist()}" |
29
|
|
|
) |
30
|
|
|
df = pd.DataFrame(data=self.generate(phi, psi), columns=["score"]) |
|
|
|
|
31
|
|
|
df = df.reset_index() |
|
|
|
|
32
|
|
|
df.columns = ["sample", "score"] |
33
|
|
|
return ConcordanceDf(df) |
34
|
|
|
|
35
|
|
|
def generate(self, phi: SimilarityDf, psi: SimilarityDf) -> Generator[float, None, None]: |
|
|
|
|
36
|
|
|
for b in range(self.n_samples): |
|
|
|
|
37
|
|
|
phi_b = self.rand.choice(phi, replace=True) |
38
|
|
|
psi_b = self.rand.choice(psi, replace=True) |
39
|
|
|
yield self._calc(phi_b, psi_b) |
40
|
|
|
|
41
|
|
|
def _calc(self, phi: SimilarityDf, psi: SimilarityDf) -> float: |
|
|
|
|
42
|
|
|
raise NotImplemented() |
|
|
|
|
43
|
|
|
|
44
|
|
|
|
45
|
|
|
class TauConcordanceCalculator(ConcordanceCalculator): |
|
|
|
|
46
|
|
|
def _calc(self, phi: SimilarityDf, psi: SimilarityDf) -> float: |
47
|
|
|
n = len(phi) |
|
|
|
|
48
|
|
|
numerator = self._n_z(phi, psi, 1) - self._n_z(phi, psi, -1) |
49
|
|
|
denominator = math.factorial(n) / (2 * math.factorial(n - 2)) |
50
|
|
|
return numerator / denominator |
51
|
|
|
|
52
|
|
|
def _n_z(self, a: Sequence[float], b: Sequence[float], z: int) -> int: |
|
|
|
|
53
|
|
|
return int( |
54
|
|
|
np.sum( |
55
|
|
|
[ |
56
|
|
|
int( |
57
|
|
|
np.sum( |
58
|
|
|
[ |
59
|
|
|
int(np.sign(a[i] - a[j]) == z * np.sign(b[i] - b[j]) != 0) |
60
|
|
|
for j in range(i) |
61
|
|
|
] |
62
|
|
|
) |
63
|
|
|
) |
64
|
|
|
for i in range(len(a)) |
65
|
|
|
] |
66
|
|
|
) |
67
|
|
|
) |
68
|
|
|
|
69
|
|
|
|
70
|
|
|
__all__ = ["ConcordanceCalculator", "TauConcordanceCalculator"] |
71
|
|
|
|