1
|
|
|
""" |
2
|
|
|
Scoring (regression and enrichment) calculations. |
3
|
|
|
""" |
4
|
|
|
import abc |
5
|
|
|
import enum |
6
|
|
|
import math |
7
|
|
|
from typing import Generic, Mapping, Sequence, Type, TypeVar, Union |
8
|
|
|
|
9
|
|
|
import pandas as pd |
|
|
|
|
10
|
|
|
from typeddfs import TypedDfs |
|
|
|
|
11
|
|
|
|
12
|
|
|
from mandos.analysis import AnalysisUtils as Au |
13
|
|
|
from mandos.model import CleverEnum |
14
|
|
|
from mandos.model.hits import AbstractHit, HitFrame, Pair |
15
|
|
|
|
16
|
|
|
S = TypeVar("S", bound=Union[int, float, bool]) |
|
|
|
|
17
|
|
|
|
18
|
|
|
|
19
|
|
|
EnrichmentDf = ( |
20
|
|
|
TypedDfs.typed("EnrichmentDf") |
21
|
|
|
.require("predicate", "object", dtype=str) |
22
|
|
|
.require("samples", dtype=int) |
23
|
|
|
).build() |
24
|
|
|
# extra cols are, e.g., alpha(score_1), alpha(is_hit) |
25
|
|
|
|
26
|
|
|
|
27
|
|
|
ScoreDf = ( |
28
|
|
|
TypedDfs.typed("ScoreDf") |
29
|
|
|
.require("inchikey", dtype=str, index=True) |
30
|
|
|
.reserve("score") |
31
|
|
|
.reserve("is_hit", dtype=bool) |
32
|
|
|
).build() |
33
|
|
|
# extra cols are, e.g., score_1, score_hello, is_lethal, is_good |
34
|
|
|
|
35
|
|
|
|
36
|
|
|
def _score_cols(self: ScoreDf) -> Sequence[str]: |
37
|
|
|
return [ |
38
|
|
|
col |
39
|
|
|
for col in self.columns |
40
|
|
|
if col == "score" or col.startswith("score_") or col.startswith("is_") |
41
|
|
|
] |
42
|
|
|
|
43
|
|
|
|
44
|
|
|
def _all_scores(self: ScoreDf) -> Mapping[str, Mapping[str, S]]: |
45
|
|
|
results = {} |
46
|
|
|
for c in self.score_cols: |
|
|
|
|
47
|
|
|
results[c] = self[c].to_dict() |
48
|
|
|
return results |
49
|
|
|
|
50
|
|
|
|
51
|
|
|
ScoreDf.score_cols = _score_cols |
52
|
|
|
ScoreDf.all_scores = _all_scores |
53
|
|
|
|
54
|
|
|
|
55
|
|
|
class EnrichmentCalculator(Generic[S], metaclass=abc.ABCMeta): |
|
|
|
|
56
|
|
|
def __init__(self, scores: Mapping[str, S]): |
57
|
|
|
self.scores = scores |
58
|
|
|
|
59
|
|
|
def calc(self, hits: Sequence[AbstractHit]) -> Mapping[Pair, float]: |
|
|
|
|
60
|
|
|
pair_to_hits = Au.hit_multidict(hits, "to_pair") |
61
|
|
|
results = {} |
62
|
|
|
for pair, the_hits in pair_to_hits.items(): |
|
|
|
|
63
|
|
|
results[pair] = self.for_pair(hits) |
64
|
|
|
return results |
65
|
|
|
|
66
|
|
|
def for_pair(self, hits: Sequence[AbstractHit]) -> float: |
|
|
|
|
67
|
|
|
raise NotImplementedError() |
68
|
|
|
|
69
|
|
|
|
70
|
|
|
class AlphaCalculator(EnrichmentCalculator[S]): |
|
|
|
|
71
|
|
|
def for_pair(self, hits: Sequence[AbstractHit]) -> float: |
|
|
|
|
72
|
|
|
source_to_hits = Au.hit_multidict(hits, "data_source") |
73
|
|
|
terms = [] |
74
|
|
|
for source, source_hits in source_to_hits.items(): |
|
|
|
|
75
|
|
|
terms.append(self._calc_term(source_hits)) |
76
|
|
|
return math.fsum(terms) / len(terms) |
77
|
|
|
|
78
|
|
|
def _calc_term(self, hits: Sequence[AbstractHit]) -> float: |
79
|
|
|
values = [ |
80
|
|
|
Au.elle(hit.value) * (2 * float(self.scores[hit.origin_inchikey] - 1)) ** 2 |
81
|
|
|
for hit in hits |
82
|
|
|
] |
83
|
|
|
return math.fsum(values) / len(values) |
84
|
|
|
|
85
|
|
|
|
86
|
|
|
class FoldUnweightedCalc(EnrichmentCalculator[bool]): |
|
|
|
|
87
|
|
|
def for_pair(self, hits: Sequence[AbstractHit]) -> float: |
|
|
|
|
88
|
|
|
numerator = len([hit for hit in hits if self.scores[hit.origin_inchikey]]) |
89
|
|
|
denominator = len([hit for hit in hits if not self.scores[hit.origin_inchikey]]) |
90
|
|
|
if denominator == 0: |
91
|
|
|
return float("inf") |
92
|
|
|
return numerator / denominator |
93
|
|
|
|
94
|
|
|
|
95
|
|
|
class FoldWeightedCalc(EnrichmentCalculator[bool]): |
|
|
|
|
96
|
|
|
def for_pair(self, hits: Sequence[AbstractHit]) -> float: |
|
|
|
|
97
|
|
|
yes = [hit for hit in hits if self.scores[hit.origin_inchikey]] |
98
|
|
|
no = [hit for hit in hits if not self.scores[hit.origin_inchikey]] |
|
|
|
|
99
|
|
|
numerator = math.fsum((hit.value for hit in yes)) |
|
|
|
|
100
|
|
|
denominator = math.fsum((hit.value for hit in no)) |
101
|
|
|
if denominator == 0: |
102
|
|
|
return float("inf") |
103
|
|
|
return numerator / denominator |
104
|
|
|
|
105
|
|
|
|
106
|
|
|
class SumWeightedCalc(EnrichmentCalculator[S]): |
|
|
|
|
107
|
|
|
def for_pair(self, hits: Sequence[AbstractHit]) -> float: |
|
|
|
|
108
|
|
|
return math.fsum([self.scores[hit.origin_inchikey] * hit.value for hit in hits]) / len(hits) |
109
|
|
|
|
110
|
|
|
|
111
|
|
|
class SumUnweightedCalc(EnrichmentCalculator[S]): |
|
|
|
|
112
|
|
|
def for_pair(self, hits: Sequence[AbstractHit]) -> float: |
|
|
|
|
113
|
|
|
return math.fsum([self.scores[hit.origin_inchikey] for hit in hits]) / len(hits) |
114
|
|
|
|
115
|
|
|
|
116
|
|
|
class EnrichmentAlg(CleverEnum): |
|
|
|
|
117
|
|
|
""" """ |
118
|
|
|
|
119
|
|
|
alpha = enum.auto() |
120
|
|
|
fold = enum.auto() |
121
|
|
|
fold_w = enum.auto() |
122
|
|
|
sum = enum.auto() |
123
|
|
|
sum_w = enum.auto() |
124
|
|
|
|
125
|
|
|
@property |
126
|
|
|
def description(self) -> str: |
|
|
|
|
127
|
|
|
s = self.symbol |
|
|
|
|
128
|
|
|
return { |
129
|
|
|
EnrichmentAlg.alpha: rf"[float] {s}(p) = Mean product of rescaled weights and scores; see the docs", |
|
|
|
|
130
|
|
|
EnrichmentAlg.fold: rf"[bool] {s}(p) = #(c has p and hit) / #(c has p and not hit)", |
131
|
|
|
EnrichmentAlg.fold_w: rf"[bool] {s}(p) = ∑(w(c, pair) s.t. c is hit) / ∑(w(c, pair) s.t. c not hit)", |
|
|
|
|
132
|
|
|
EnrichmentAlg.sum: rf"{s}(p) = ∑(score(c) s.t. c has p)", |
133
|
|
|
EnrichmentAlg.sum_w: rf"{s}(p) = ∑(score(c) × w(c, p) for all c)", |
134
|
|
|
}[self] |
135
|
|
|
|
136
|
|
|
@property |
137
|
|
|
def symbol(self) -> str: |
|
|
|
|
138
|
|
|
return { |
139
|
|
|
EnrichmentAlg.alpha: "α", |
140
|
|
|
EnrichmentAlg.fold: r"β", |
141
|
|
|
EnrichmentAlg.fold_w: "β*", |
142
|
|
|
EnrichmentAlg.sum: r"γ", |
143
|
|
|
EnrichmentAlg.sum_w: r"γ*", |
144
|
|
|
}[self] |
145
|
|
|
|
146
|
|
|
@property |
147
|
|
|
def clazz(self) -> Type[EnrichmentCalculator]: |
|
|
|
|
148
|
|
|
return { |
149
|
|
|
EnrichmentAlg.alpha: AlphaCalculator, |
150
|
|
|
EnrichmentAlg.fold: FoldUnweightedCalc, |
151
|
|
|
EnrichmentAlg.fold_w: FoldWeightedCalc, |
152
|
|
|
EnrichmentAlg.sum: SumUnweightedCalc, |
153
|
|
|
EnrichmentAlg.sum_w: SumWeightedCalc, |
154
|
|
|
}[self.name] |
155
|
|
|
|
156
|
|
|
|
157
|
|
|
class EnrichmentCalculation: |
|
|
|
|
158
|
|
|
def calc( |
|
|
|
|
159
|
|
|
self, data: HitFrame, score_df: ScoreDf, algorithm: Union[str, EnrichmentAlg] |
|
|
|
|
160
|
|
|
) -> EnrichmentDf: |
161
|
|
|
hits = ... |
162
|
|
|
alg_clazz = EnrichmentDf.of(algorithm).clazz |
163
|
|
|
alg_name = str(algorithm) |
164
|
|
|
samples_per_pair = {k: len(v) for k, v in Au.hit_multidict(hits, "pair").items()} |
165
|
|
|
results = {} |
166
|
|
|
for col, scores in score_df.all_scores(): |
167
|
|
|
calculated = alg_clazz(scores).calc(hits) |
168
|
|
|
results[col] = calculated |
169
|
|
|
return EnrichmentDf( |
170
|
|
|
[ |
171
|
|
|
pd.Series( |
172
|
|
|
{ |
173
|
|
|
**dict( |
174
|
|
|
predicate=pair.pred, |
175
|
|
|
object=pair.obj, |
176
|
|
|
samples=n_samples, |
177
|
|
|
), |
178
|
|
|
**{f"{alg_name}({c})": vs[pair] for c, vs in results.items()}, |
179
|
|
|
} |
180
|
|
|
) |
181
|
|
|
for pair, n_samples in samples_per_pair.items() |
182
|
|
|
] |
183
|
|
|
) |
184
|
|
|
|
185
|
|
|
|
186
|
|
|
__all__ = [ |
187
|
|
|
"AlphaCalculator", |
188
|
|
|
"EnrichmentCalculator", |
189
|
|
|
"FoldUnweightedCalc", |
190
|
|
|
"FoldWeightedCalc", |
191
|
|
|
"SumUnweightedCalc", |
192
|
|
|
"SumWeightedCalc", |
193
|
|
|
"EnrichmentCalculation", |
194
|
|
|
"EnrichmentAlg", |
195
|
|
|
"EnrichmentDf", |
196
|
|
|
"ScoreDf", |
197
|
|
|
] |
198
|
|
|
|