1
|
|
|
""" |
2
|
|
|
TODO This module will probably be deleted. |
3
|
|
|
""" |
4
|
|
|
|
5
|
|
|
from __future__ import annotations |
6
|
|
|
from collections import defaultdict |
7
|
|
|
import logging |
8
|
|
|
from typing import Sequence, Mapping, Set, TypeVar, Callable, SupportsFloat, Optional, Type, List |
|
|
|
|
9
|
|
|
from typing import Tuple as Tup |
10
|
|
|
|
11
|
|
|
import numpy as np |
|
|
|
|
12
|
|
|
import pandas as pd |
|
|
|
|
13
|
|
|
from typeddfs import TypedDfs, TypedDf |
|
|
|
|
14
|
|
|
|
15
|
|
|
logger = logging.getLogger("mandos") |
16
|
|
|
CompoundCompoundPair = Tup[str, str] |
17
|
|
|
|
18
|
|
|
|
19
|
|
|
T = TypeVar("T") |
|
|
|
|
20
|
|
|
Z = TypeVar("Z") |
|
|
|
|
21
|
|
|
|
22
|
|
|
|
23
|
|
|
class ConfusionMatrix(TypedDf): |
|
|
|
|
24
|
|
|
@property |
25
|
|
|
def rows(self): |
|
|
|
|
26
|
|
|
"""""" |
27
|
|
|
return [str(s) for s in self.index.tolist()] |
28
|
|
|
|
29
|
|
|
@property |
30
|
|
|
def cols(self): |
|
|
|
|
31
|
|
|
"""""" |
32
|
|
|
return [str(s) for s in self.columns.tolist()] |
33
|
|
|
|
34
|
|
|
|
35
|
|
|
class AffinityFunctions: |
36
|
|
|
""" |
37
|
|
|
These are functions that can be passed into ``AffinityMatrix.from_function``. |
38
|
|
|
""" |
39
|
|
|
|
40
|
|
|
@classmethod |
41
|
|
|
def jaccard(cls) -> Callable[[Set[Z], Set[Z]], float]: |
|
|
|
|
42
|
|
|
def x(a_set: Set[Z], b_set: Set[Z]): |
|
|
|
|
43
|
|
|
if len(a_set) == len(b_set) == 0: |
44
|
|
|
return float("NaN") |
45
|
|
|
return len(a_set.intersection(b_set)) / len(a_set.union(b_set)) |
46
|
|
|
|
47
|
|
|
x.__name__ = "jaccard" |
48
|
|
|
return x |
49
|
|
|
|
50
|
|
|
@classmethod |
51
|
|
|
def bit_string_jaccard(cls) -> Callable[[str, str], float]: |
|
|
|
|
52
|
|
|
def x(a: str, b: str): |
|
|
|
|
53
|
|
|
if len(a) == len(b) == 0: |
54
|
|
|
return float("NaN") |
55
|
|
|
on_bits_a = {i for i, v in enumerate(a) if int(v) == 1} |
56
|
|
|
on_bits_b = {i for i, v in enumerate(b) if int(v) == 1} |
57
|
|
|
return len(on_bits_a.intersection(on_bits_b)) / len(on_bits_a.union(on_bits_b)) |
58
|
|
|
|
59
|
|
|
x.__name__ = "bit_string_jaccard" |
60
|
|
|
return x |
61
|
|
|
|
62
|
|
|
@classmethod |
63
|
|
|
def identity(cls) -> Callable[[str, str], float]: |
|
|
|
|
64
|
|
|
def x(a: str, b: str): |
|
|
|
|
65
|
|
|
return float(a == b) |
66
|
|
|
|
67
|
|
|
x.__name__ = "identity" |
68
|
|
|
return x |
69
|
|
|
|
70
|
|
|
@classmethod |
71
|
|
|
def random_uniform(cls, state: np.random.RandomState) -> Callable[[Z, Z], float]: |
|
|
|
|
72
|
|
|
def x(a: Z, b: Z): |
|
|
|
|
73
|
|
|
return state.uniform(0, 1) |
74
|
|
|
|
75
|
|
|
x.__name__ = "random_uniform" |
76
|
|
|
return x |
77
|
|
|
|
78
|
|
|
@classmethod |
79
|
|
|
def rho_cm(cls, df: ConfusionMatrix) -> Callable[[str, str], float]: |
|
|
|
|
80
|
|
|
def x(d: ConfusionMatrix, a: str, b: str): |
|
|
|
|
81
|
|
|
ij = d.at[a, b] |
|
|
|
|
82
|
|
|
a_off_diag = np.sum(d.at[a, j] for j in d.cols if j != a) |
83
|
|
|
b_off_diag = np.sum(d.at[i, b] for i in d.rows if i != b) |
84
|
|
|
return ij / a_off_diag / b_off_diag |
85
|
|
|
|
86
|
|
|
def qq(a: str, b: str): |
|
|
|
|
87
|
|
|
return 0.5 * x(df, a, b) + 0.5 * x(ConfusionMatrix(df.transpose()), a, b) |
88
|
|
|
|
89
|
|
|
x.__name__ = "rho_cm" |
90
|
|
|
return qq |
91
|
|
|
|
92
|
|
|
@classmethod |
93
|
|
|
def pearson( |
94
|
|
|
cls, weights: Optional[Sequence[SupportsFloat]] = None |
|
|
|
|
95
|
|
|
) -> Callable[[Sequence[SupportsFloat], Sequence[SupportsFloat]], float]: |
96
|
|
|
""" |
97
|
|
|
Pearson correlation coefficient, possibly weighted. |
98
|
|
|
|
99
|
|
|
Args: |
100
|
|
|
weights: |
101
|
|
|
|
102
|
|
|
Returns: |
103
|
|
|
|
104
|
|
|
""" |
105
|
|
|
weights = 1.0 if weights is None else np.array(weights, dtype=np.float64) |
106
|
|
|
|
107
|
|
|
def x(a: Sequence[SupportsFloat], b: Sequence[SupportsFloat]): |
|
|
|
|
108
|
|
|
a, b = np.array(a, dtype=np.float64), np.array(b, dtype=np.float64) |
109
|
|
|
a = (a - a.mean()) / a.std() |
110
|
|
|
b = (b - b.mean()) / b.std() |
111
|
|
|
return np.average(a * b, weights=weights) |
112
|
|
|
|
113
|
|
|
x.__name__ = "pearson" if weights == 1.0 else "weighted-pearson" |
114
|
|
|
return x |
115
|
|
|
|
116
|
|
|
@classmethod |
117
|
|
|
def negative_minkowski( |
118
|
|
|
cls, order: float, weights: Optional[Sequence[SupportsFloat]] = None |
|
|
|
|
119
|
|
|
) -> Callable[[Sequence[SupportsFloat], Sequence[SupportsFloat]], float]: |
120
|
|
|
""" |
121
|
|
|
|
122
|
|
|
If ``order==0``, this is defined to return the number of elements that differ (number of nonzero). |
|
|
|
|
123
|
|
|
|
124
|
|
|
Args: |
125
|
|
|
order: |
126
|
|
|
weights: |
127
|
|
|
|
128
|
|
|
Returns: |
129
|
|
|
|
130
|
|
|
""" |
131
|
|
|
weights = 1.0 if weights is None else np.array(weights, dtype=np.float64) |
132
|
|
|
|
133
|
|
|
def x(a: Sequence[SupportsFloat], b: Sequence[SupportsFloat]): |
|
|
|
|
134
|
|
|
a, b = np.array(a, dtype=np.float64), np.array(b, dtype=np.float64) |
135
|
|
|
if np.isposinf(order): |
|
|
|
|
136
|
|
|
return -np.max(np.abs(a - b) * weights) |
137
|
|
|
elif order == 0: |
138
|
|
|
return -float(np.count_nonzero(a - b)) |
139
|
|
|
else: |
140
|
|
|
return -np.float_power( |
141
|
|
|
np.sum(np.float_power(np.abs(a - b) * weights, order)), 1 / order |
142
|
|
|
) |
143
|
|
|
|
144
|
|
|
x.__name__ = ( |
145
|
|
|
"minkowski" + str(order) if weights == 1.0 else "weighted-minkowski" + str(order) |
146
|
|
|
) |
147
|
|
|
return x |
148
|
|
|
|
149
|
|
|
|
150
|
|
|
class AffinityMatrix(TypedDf): |
151
|
|
|
""" |
152
|
|
|
An affinity matrix of compound by compound. |
153
|
|
|
It has a single index, which is compound A, |
154
|
|
|
and the column labels are the compound B. |
155
|
|
|
""" |
156
|
|
|
|
157
|
|
|
@classmethod |
158
|
|
|
def index_names(cls) -> List[str]: |
|
|
|
|
159
|
|
|
return ["name"] |
160
|
|
|
|
161
|
|
|
# @classmethod |
162
|
|
|
# def must_be_symmetric(cls) -> bool: |
163
|
|
|
# return True |
164
|
|
|
|
165
|
|
|
@property |
166
|
|
|
def rows(self): |
|
|
|
|
167
|
|
|
"""""" |
168
|
|
|
return [str(s) for s in self.index.tolist()] |
169
|
|
|
|
170
|
|
|
@property |
171
|
|
|
def cols(self): |
|
|
|
|
172
|
|
|
"""""" |
173
|
|
|
return [str(s) for s in self.columns.tolist()] |
174
|
|
|
|
175
|
|
|
def restrict(self, to: Set[str]) -> AffinityMatrix: |
|
|
|
|
176
|
|
|
df = self[self["name"].isin(to)] |
|
|
|
|
177
|
|
|
df = df[[v for v in df.columns if v in to]] |
|
|
|
|
178
|
|
|
return AffinityMatrix.convert(df) |
179
|
|
|
|
180
|
|
|
def non_self_pairs(self) -> Mapping[CompoundCompoundPair, float]: |
181
|
|
|
""" |
182
|
|
|
Gets the STRICT lower-triangular matrix, which excludes comparisons where the two labels are the same. |
|
|
|
|
183
|
|
|
The result is (n choose 2) matrix elements. |
184
|
|
|
|
185
|
|
|
Returns: |
186
|
|
|
A dict from comparison labels to their values in the matrix (i.e. (i, j)). |
187
|
|
|
""" |
188
|
|
|
return {(a, b): value for (a, b), value in self.all_pairs().items() if a != b} |
189
|
|
|
|
190
|
|
|
def all_pairs(self) -> Mapping[CompoundCompoundPair, float]: |
191
|
|
|
""" |
192
|
|
|
Gets the (non-strict) lower-triangular matrix. |
193
|
|
|
The result is (n(n+1)/2) matrix elements. |
194
|
|
|
|
195
|
|
|
Returns: |
196
|
|
|
A dict from comparison labels to their values in the matrix (i.e. (i, j)). |
197
|
|
|
""" |
198
|
|
|
values = {} |
199
|
|
|
for i in range(0, len(self)): |
200
|
|
|
a = self.index.values[i] |
|
|
|
|
201
|
|
|
for j in range(0, i + 1): |
202
|
|
|
b = self.columns.values[j] |
|
|
|
|
203
|
|
|
# use (b, a) to get lower triangle |
204
|
|
|
values[(str(b).strip(), str(a).strip())] = self.iat[i, j] |
205
|
|
|
return values |
206
|
|
|
|
207
|
|
|
@classmethod |
208
|
|
|
def from_triples(cls, df: pd.DataFrame) -> AffinityMatrix: |
|
|
|
|
209
|
|
|
# TODO this function doesn't belong here |
|
|
|
|
210
|
|
|
dct = defaultdict(set) |
211
|
|
|
for row in df.itertuples(): |
212
|
|
|
dct[row.compound_lookup].add(row.object_id + "," + row.predicate) |
213
|
|
|
return AffinityMatrix.from_function(dct, AffinityFunctions.jaccard()) |
214
|
|
|
|
215
|
|
|
@classmethod |
216
|
|
|
def from_function(cls, items: Mapping[str, T], fn: Callable[[T, T], float]) -> AffinityMatrix: |
|
|
|
|
217
|
|
|
rows = [] |
218
|
|
|
for a_label, a_value in items.items(): |
219
|
|
|
rows.append( |
220
|
|
|
pd.Series( |
221
|
|
|
{ |
222
|
|
|
"name": str(a_label), |
223
|
|
|
**{ |
224
|
|
|
str(b_label): fn(a_value, b_value) for b_label, b_value in items.items() |
225
|
|
|
}, |
226
|
|
|
} |
227
|
|
|
) |
228
|
|
|
) |
229
|
|
|
df = pd.DataFrame(rows) # .astype(np.float64) |
|
|
|
|
230
|
|
|
df["name"] = df["name"].map(lambda v: str(v).replace(".0", "")).astype(str) |
231
|
|
|
df = df.set_index("name") |
|
|
|
|
232
|
|
|
afm = AffinityMatrix.convert(df) |
233
|
|
|
for c in afm.columns: |
|
|
|
|
234
|
|
|
afm[c] = afm[c].astype(np.float64) |
235
|
|
|
return afm |
236
|
|
|
|
237
|
|
|
|
238
|
|
|
__all__ = [ |
239
|
|
|
"AffinityMatrix", |
240
|
|
|
"AffinityFunctions", |
241
|
|
|
] |
242
|
|
|
|