|
1
|
|
|
""" |
|
2
|
|
|
Calculations of concordance between annotations. |
|
3
|
|
|
""" |
|
4
|
|
|
import abc |
|
5
|
|
|
import enum |
|
6
|
|
|
import math |
|
7
|
|
|
from collections import defaultdict |
|
8
|
|
|
from typing import Collection, Sequence, Type, Union |
|
9
|
|
|
|
|
10
|
|
|
import numpy as np |
|
|
|
|
|
|
11
|
|
|
|
|
12
|
|
|
from mandos.analysis import AnalysisUtils as Au |
|
13
|
|
|
from mandos.analysis import SimilarityDf |
|
14
|
|
|
from mandos.model import CleverEnum |
|
15
|
|
|
from mandos.model.hits import AbstractHit |
|
16
|
|
|
|
|
17
|
|
|
# note that most of these math functions are much faster than their numpy counterparts |
|
18
|
|
|
# if we're not broadcasting, it's almost always better to use them |
|
19
|
|
|
# some are more accurate, too |
|
20
|
|
|
# e.g. we're using fsum rather than sum |
|
21
|
|
|
|
|
22
|
|
|
|
|
23
|
|
|
class MatrixCalculator(metaclass=abc.ABCMeta): |
|
|
|
|
|
|
24
|
|
|
def calc(self, hits: Sequence[AbstractHit]) -> SimilarityDf: |
|
|
|
|
|
|
25
|
|
|
raise NotImplemented() |
|
|
|
|
|
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
class JPrimeMatrixCalculator(MatrixCalculator): |
|
|
|
|
|
|
29
|
|
|
def calc(self, hits: Sequence[AbstractHit]) -> SimilarityDf: |
|
30
|
|
|
inchikey_to_hits = Au.hit_multidict(hits, "origin_inchikey") |
|
31
|
|
|
data = defaultdict(dict) |
|
32
|
|
|
for (c1, hits1), (c2, hits2) in zip(inchikey_to_hits.items(), inchikey_to_hits.items()): |
|
|
|
|
|
|
33
|
|
|
data[c1][c2] = self._j_prime(hits1, hits2) |
|
34
|
|
|
return SimilarityDf.from_dict(data) |
|
35
|
|
|
|
|
36
|
|
|
def _j_prime(self, hits1: Collection[AbstractHit], hits2: Collection[AbstractHit]) -> float: |
|
37
|
|
|
sources = {h.data_source for h in hits1}.intersection({h.data_source for h in hits2}) |
|
38
|
|
|
if len(sources) == 0: |
|
39
|
|
|
return np.nan |
|
40
|
|
|
values = [ |
|
41
|
|
|
self._jx( |
|
42
|
|
|
[h for h in hits1 if h.data_source == source], |
|
43
|
|
|
[h for h in hits1 if h.data_source == source], |
|
44
|
|
|
) |
|
45
|
|
|
for source in sources |
|
46
|
|
|
] |
|
47
|
|
|
return float(math.fsum(values) / len(values)) |
|
48
|
|
|
|
|
49
|
|
|
def _jx(self, hits1: Collection[AbstractHit], hits2: Collection[AbstractHit]) -> float: |
|
50
|
|
|
pair_to_weights = Au.weights_of_pairs(hits1, hits2) |
|
51
|
|
|
values = [self._wedge(ca, cb) / self._vee(ca, cb) for ca, cb in pair_to_weights.values()] |
|
52
|
|
|
return float(math.fsum(values) / len(values)) |
|
53
|
|
|
|
|
54
|
|
|
def _wedge(self, ca: float, cb: float) -> float: |
|
|
|
|
|
|
55
|
|
|
return math.sqrt(Au.elle(ca) * Au.elle(cb)) |
|
56
|
|
|
|
|
57
|
|
|
def _vee(self, ca: float, cb: float) -> float: |
|
|
|
|
|
|
58
|
|
|
return Au.elle(ca) + Au.elle(cb) - math.sqrt(Au.elle(ca) * Au.elle(cb)) |
|
59
|
|
|
|
|
60
|
|
|
|
|
61
|
|
|
class MatrixAlg(CleverEnum): |
|
|
|
|
|
|
62
|
|
|
j = enum.auto() |
|
63
|
|
|
|
|
64
|
|
|
@property |
|
65
|
|
|
def clazz(self) -> Type[MatrixCalculator]: |
|
|
|
|
|
|
66
|
|
|
return {MatrixAlg.j: JPrimeMatrixCalculator}[self] |
|
67
|
|
|
|
|
68
|
|
|
|
|
69
|
|
|
class MatrixCalculation: |
|
|
|
|
|
|
70
|
|
|
@classmethod |
|
71
|
|
|
def create(cls, algorithm: Union[str, MatrixAlg]) -> MatrixCalculator: |
|
|
|
|
|
|
72
|
|
|
alg_name = algorithm if isinstance(algorithm, str) else algorithm.name |
|
73
|
|
|
alg = MatrixAlg.of(algorithm) |
|
74
|
|
|
return alg.clazz(alg_name) |
|
75
|
|
|
|
|
76
|
|
|
|
|
77
|
|
|
__all__ = ["MatrixCalculator", "JPrimeMatrixCalculator"] |
|
78
|
|
|
|