1
|
|
|
""" |
2
|
|
|
Calculations of concordance between annotations. |
3
|
|
|
""" |
4
|
|
|
import abc |
5
|
|
|
import enum |
6
|
|
|
import math |
7
|
|
|
import time |
8
|
|
|
from collections import defaultdict |
9
|
|
|
from typing import Collection, Sequence, Type, Union |
10
|
|
|
|
11
|
|
|
import decorateme |
|
|
|
|
12
|
|
|
import numpy as np |
|
|
|
|
13
|
|
|
import pandas as pd |
|
|
|
|
14
|
|
|
from pocketutils.core.chars import Chars |
|
|
|
|
15
|
|
|
from pocketutils.core.enums import CleverEnum |
|
|
|
|
16
|
|
|
from pocketutils.tools.unit_tools import UnitTools |
|
|
|
|
17
|
|
|
|
18
|
|
|
from mandos.analysis import AnalysisUtils as Au |
19
|
|
|
from mandos.analysis.io_defns import SimilarityDfLongForm, SimilarityDfShortForm |
20
|
|
|
from mandos.model.hits import AbstractHit |
21
|
|
|
|
22
|
|
|
# note that most of these math functions are much faster than their numpy counterparts |
23
|
|
|
# if we're not broadcasting, it's almost always better to use them |
24
|
|
|
# some are more accurate, too |
25
|
|
|
# e.g. we're using fsum rather than sum |
26
|
|
|
from mandos.model.utils.setup import logger |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
30
|
|
|
class MatrixCalculator(metaclass=abc.ABCMeta): |
31
|
|
|
def calc_all(self, hits: Sequence[AbstractHit]) -> SimilarityDfLongForm: |
|
|
|
|
32
|
|
|
raise NotImplemented() |
|
|
|
|
33
|
|
|
|
34
|
|
|
|
35
|
|
|
class _Inf: |
36
|
|
|
def __init__(self, n: int): |
37
|
|
|
self.n = n |
|
|
|
|
38
|
|
|
self.used, self.t0, self.nonzeros = set(), time.monotonic(), 0 |
|
|
|
|
39
|
|
|
|
40
|
|
|
def is_used(self, c1: str, c2: str) -> bool: |
|
|
|
|
41
|
|
|
return (c1, c2) in self.used or (c2, c1) in self.used |
42
|
|
|
|
43
|
|
|
def got(self, c1: str, c2: str, z: float) -> None: |
|
|
|
|
44
|
|
|
self.used.add((c1, c2)) |
45
|
|
|
self.nonzeros += int(c1 != c2 and not np.isnan(z) and 0 < z < 1) |
46
|
|
|
i = self.i |
47
|
|
|
if i % 5000 == 0: |
48
|
|
|
lg_ = next( |
49
|
|
|
t_ |
50
|
|
|
for s_, t_ in zip([50000, 10000, 1000], ["success", "info", "debug"]) |
51
|
|
|
if not i % s_ |
52
|
|
|
) |
53
|
|
|
self.log(lg_) |
54
|
|
|
|
55
|
|
|
@property |
56
|
|
|
def i(self) -> int: |
|
|
|
|
57
|
|
|
return len(self.used) |
58
|
|
|
|
59
|
|
|
def log(self, level: str) -> None: |
|
|
|
|
60
|
|
|
delta = UnitTools.delta_time_to_str(time.monotonic() - self.t0, space=Chars.narrownbsp) |
61
|
|
|
logger.log( |
62
|
|
|
level.upper(), |
63
|
|
|
f"Processed {self.i:,}/{self.n:,} pairs in {delta};" |
64
|
|
|
+ f" {self.nonzeros:,} ({self.nonzeros / self.i * 100:.1f}%) are nonzero", |
65
|
|
|
) |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
class JPrimeMatrixCalculator(MatrixCalculator): |
|
|
|
|
69
|
|
|
def calc_all(self, hits: Sequence[AbstractHit]) -> SimilarityDfLongForm: |
70
|
|
|
key_to_hit = Au.hit_multidict(hits, "search_key") |
71
|
|
|
logger.notice(f"Calculating J on {len(key_to_hit)} keys from {len(hits)} hits") |
72
|
|
|
dfs = [] |
73
|
|
|
for key, key_hits in key_to_hit.items(): |
74
|
|
|
df: SimilarityDfShortForm = self.calc_one(key, key_hits) |
|
|
|
|
75
|
|
|
df = df.to_long_form(kind="psi", key=key) |
|
|
|
|
76
|
|
|
dfs += [df] |
77
|
|
|
return SimilarityDfLongForm(pd.concat(dfs)) |
78
|
|
|
|
79
|
|
|
def calc_one(self, key: str, hits: Sequence[AbstractHit]) -> SimilarityDfShortForm: |
|
|
|
|
80
|
|
|
ik2hits = Au.hit_multidict(hits, "origin_inchikey") |
81
|
|
|
logger.info(f"Calculating J on {key} for {len(ik2hits)} compounds and {len(hits)} hits") |
82
|
|
|
data = defaultdict(dict) |
83
|
|
|
inf = _Inf(n=int(len(ik2hits) * (len(ik2hits) - 1) / 2)) |
84
|
|
|
for (c1, hits1) in ik2hits.items(): |
|
|
|
|
85
|
|
|
for (c2, hits2) in ik2hits.items(): |
|
|
|
|
86
|
|
|
if inf.is_used(c1, c2): |
87
|
|
|
continue |
88
|
|
|
z = 1 if c1 == c2 else self._j_prime(hits1, hits2) |
|
|
|
|
89
|
|
|
data[c1][c2] = z |
90
|
|
|
inf.got(c1, c2, z) |
91
|
|
|
inf.log("notice") |
92
|
|
|
return SimilarityDfShortForm.from_dict(data) |
93
|
|
|
|
94
|
|
|
def _j_prime(self, hits1: Collection[AbstractHit], hits2: Collection[AbstractHit]) -> float: |
95
|
|
|
if len(hits1) == 0 or len(hits2) == 0: |
96
|
|
|
return 0 |
97
|
|
|
sources = {h.data_source for h in hits1}.intersection({h.data_source for h in hits2}) |
98
|
|
|
if len(sources) == 0: |
99
|
|
|
return np.nan |
100
|
|
|
values = [ |
101
|
|
|
self._jx( |
102
|
|
|
[h for h in hits1 if h.data_source == source], |
103
|
|
|
[h for h in hits2 if h.data_source == source], |
104
|
|
|
) |
105
|
|
|
for source in sources |
106
|
|
|
] |
107
|
|
|
return float(math.fsum(values) / len(values)) |
108
|
|
|
|
109
|
|
|
def _jx(self, hits1: Collection[AbstractHit], hits2: Collection[AbstractHit]) -> float: |
110
|
|
|
pair_to_weights = Au.weights_of_pairs(hits1, hits2) |
111
|
|
|
values = [self._wedge(ca, cb) / self._vee(ca, cb) for ca, cb in pair_to_weights.values()] |
112
|
|
|
return float(math.fsum(values) / len(values)) |
113
|
|
|
|
114
|
|
|
def _wedge(self, ca: float, cb: float) -> float: |
|
|
|
|
115
|
|
|
return math.sqrt(Au.elle(ca) * Au.elle(cb)) |
116
|
|
|
|
117
|
|
|
def _vee(self, ca: float, cb: float) -> float: |
|
|
|
|
118
|
|
|
return Au.elle(ca) + Au.elle(cb) - math.sqrt(Au.elle(ca) * Au.elle(cb)) |
119
|
|
|
|
120
|
|
|
|
121
|
|
|
class MatrixAlg(CleverEnum): |
|
|
|
|
122
|
|
|
j = enum.auto() |
123
|
|
|
|
124
|
|
|
@property |
125
|
|
|
def clazz(self) -> Type[MatrixCalculator]: |
|
|
|
|
126
|
|
|
return {MatrixAlg.j: JPrimeMatrixCalculator}[self] |
127
|
|
|
|
128
|
|
|
|
129
|
|
|
@decorateme.auto_utils() |
|
|
|
|
130
|
|
|
class MatrixCalculation: |
131
|
|
|
@classmethod |
132
|
|
|
def create(cls, algorithm: Union[str, MatrixAlg]) -> MatrixCalculator: |
|
|
|
|
133
|
|
|
return MatrixAlg.of(algorithm).clazz() |
134
|
|
|
|
135
|
|
|
|
136
|
|
|
__all__ = ["MatrixCalculator", "JPrimeMatrixCalculator", "MatrixCalculation", "MatrixAlg"] |
137
|
|
|
|