1
|
|
|
from __future__ import annotations |
|
|
|
|
2
|
|
|
|
3
|
|
|
from dataclasses import dataclass |
4
|
|
|
from typing import Any, Mapping, MutableMapping, Optional, Tuple |
5
|
|
|
|
6
|
|
|
from pocketutils.core.exceptions import XValueError |
|
|
|
|
7
|
|
|
from pocketutils.tools.common_tools import CommonTools |
|
|
|
|
8
|
|
|
from typeddfs import TypedDfs |
|
|
|
|
9
|
|
|
|
10
|
|
|
from mandos import logger |
11
|
|
|
from mandos.entry.api_singletons import Apis |
12
|
|
|
from mandos.model import CompoundStruct |
13
|
|
|
from mandos.model.apis.chembl_support.chembl_utils import ChemblUtils |
14
|
|
|
from mandos.model.apis.pubchem_support.pubchem_data import PubchemData |
15
|
|
|
from mandos.model.utils import CompoundNotFoundError |
16
|
|
|
|
17
|
|
|
IdMatchDf = ( |
18
|
|
|
TypedDfs.typed("IdMatchDf") |
19
|
|
|
.reserve("inchikey", dtype=str) |
20
|
|
|
.reserve("compound_id", "compound_name", "library", dtype=str) |
21
|
|
|
.reserve("inchi", dtype=str) |
22
|
|
|
.reserve("chembl_id", "pubchem_id", "hmdb_id", dtype=str) |
23
|
|
|
.reserve("chembl_inchikey", "pubchem_inchikey", dtype=str) |
24
|
|
|
.reserve("chembl_inchi", "pubchem_inchi", dtype=str) |
25
|
|
|
.reserve("origin_inchi", "origin_inchikey", dtype=str) |
26
|
|
|
.strict(cols=False) |
27
|
|
|
.secure() |
28
|
|
|
.hash(file=True) |
29
|
|
|
).build() |
30
|
|
|
|
31
|
|
|
|
32
|
|
|
FILL_IDS = [ |
33
|
|
|
"inchi", |
34
|
|
|
"inchikey", |
35
|
|
|
"chembl_id", |
36
|
|
|
"pubchem_id", |
37
|
|
|
"chembl_inchi", |
38
|
|
|
"chembl_inchikey", |
39
|
|
|
"pubchem_inchi", |
40
|
|
|
"pubchem_inchikey", |
41
|
|
|
] |
42
|
|
|
PUT_FIRST = [ |
43
|
|
|
"compound_id", |
44
|
|
|
"compound_name", |
45
|
|
|
"library", |
46
|
|
|
"inchikey", |
47
|
|
|
"chembl_id", |
48
|
|
|
"pubchem_id", |
49
|
|
|
"g2p_id", |
50
|
|
|
"chembl_inchikey", |
51
|
|
|
"pubchem_inchikey", |
52
|
|
|
"origin_inchikey", |
53
|
|
|
] |
54
|
|
|
PUT_LAST = ["inchi", "chembl_inchi", "pubchem_inchi", "origin_inchi", "smiles"] |
55
|
|
|
|
56
|
|
|
Db = str |
57
|
|
|
|
58
|
|
|
|
59
|
|
|
def look(obj, attrs): |
|
|
|
|
60
|
|
|
s = CommonTools.look(obj, attrs) |
|
|
|
|
61
|
|
|
if isinstance(s, str) and s.upper() == "N/A": |
62
|
|
|
return None |
63
|
|
|
return None if CommonTools.is_probable_null(s) else s |
64
|
|
|
|
65
|
|
|
|
66
|
|
|
@dataclass(frozen=True, repr=True) |
|
|
|
|
67
|
|
|
class CompoundIdFiller: |
68
|
|
|
chembl: bool = True |
69
|
|
|
pubchem: bool = True |
70
|
|
|
|
71
|
|
|
def fill(self, df: IdMatchDf) -> IdMatchDf: |
|
|
|
|
72
|
|
|
df = self._prep(df) |
73
|
|
|
logger.info(f"Processing {len(df)} input compounds") |
74
|
|
|
fill = [] |
75
|
|
|
for i, row in enumerate(df.itertuples()): |
76
|
|
|
if i % 200 == 0 and i > 0: |
77
|
|
|
logger.notice(f"Processed {i:,} / {len(df):,}") |
78
|
|
|
elif i % 20 == 0 and i > 0: |
79
|
|
|
logger.info(f"Processed {i:,} / {len(df):,}") |
80
|
|
|
with logger.contextualize(line=i): |
81
|
|
|
proc = self._process( |
82
|
|
|
compound_id=look(row, "compound_id"), |
83
|
|
|
library=look(row, "library"), |
84
|
|
|
inchi=look(row, "origin_inchi"), |
85
|
|
|
inchikey=look(row, "origin_inchikey"), |
86
|
|
|
pubchem_id=look(row, "origin_pubchem_id"), |
87
|
|
|
chembl_id=look(row, "origin_chembl_id"), |
88
|
|
|
) |
89
|
|
|
fill.append(proc) |
90
|
|
|
for c in FILL_IDS: |
|
|
|
|
91
|
|
|
df[c] = [r[c] for r in fill] |
92
|
|
|
duplicate_cols = [] |
93
|
|
|
for c in FILL_IDS: |
|
|
|
|
94
|
|
|
if c in df.columns and "origin_" + c in df.columns: |
95
|
|
|
if df[c].values.tolist() == df["origin_" + c].values.tolist(): |
96
|
|
|
duplicate_cols.append("origin_" + c) |
97
|
|
|
logger.notice(f"Done — filled {len(df):,} rows") |
98
|
|
|
if len(duplicate_cols) > 0: |
99
|
|
|
df = df.drop_cols(duplicate_cols) |
100
|
|
|
logger.notice(f"Dropped duplicated columns {', '.join(duplicate_cols)}") |
101
|
|
|
order = [o for o in PUT_FIRST if o in df.columns] |
102
|
|
|
order += [c for c in df.columns if c not in PUT_FIRST and c not in PUT_LAST] |
103
|
|
|
order += [o for o in PUT_LAST if o in df.columns] |
104
|
|
|
df = df.cfirst(order) |
105
|
|
|
have_chembl = len(df) - len(df[df["chembl_id"].isnull()]["chembl_id"].tolist()) |
106
|
|
|
have_pubchem = len(df) - len(df[df["pubchem_id"].isnull()]["pubchem_id"].tolist()) |
107
|
|
|
logger.notice(f"{have_chembl:,}/{len(df):,} have ChEMBL IDs") |
108
|
|
|
logger.notice(f"{have_pubchem:,}/{len(df):,} have PubChem IDs") |
109
|
|
|
return df |
110
|
|
|
|
111
|
|
|
def _process( |
|
|
|
|
112
|
|
|
self, |
|
|
|
|
113
|
|
|
compound_id: Optional[str], |
|
|
|
|
114
|
|
|
library: Optional[str], |
|
|
|
|
115
|
|
|
inchi: Optional[str], |
|
|
|
|
116
|
|
|
inchikey: Optional[str], |
|
|
|
|
117
|
|
|
pubchem_id: Optional[str], |
|
|
|
|
118
|
|
|
chembl_id: Optional[str], |
|
|
|
|
119
|
|
|
) -> Mapping[str, Any]: |
120
|
|
|
if inchikey is pubchem_id is chembl_id is None: |
121
|
|
|
logger.error(f"No data for {compound_id}") |
122
|
|
|
return dict( |
123
|
|
|
inchi=inchi, |
124
|
|
|
inchikey=inchikey, |
125
|
|
|
chembl_id=None, |
126
|
|
|
chembl_inchi=None, |
127
|
|
|
chembl_inchikey=None, |
128
|
|
|
pubchem_id=None, |
129
|
|
|
pubchem_inchi=None, |
130
|
|
|
pubchem_inchikey=None, |
131
|
|
|
) |
132
|
|
|
fake_x = CompoundStruct("input", compound_id, inchi, inchikey) |
133
|
|
|
chembl_x = self._get_chembl(inchikey, chembl_id) |
134
|
|
|
pubchem_x = self._get_pubchem(inchikey, pubchem_id) |
135
|
|
|
################################################################################# |
136
|
|
|
# This is important and weird! |
137
|
|
|
# Where DNE = does not exist and E = exists |
138
|
|
|
# If chembl DNE and pubchem E ==> fill chembl |
139
|
|
|
# THEN: If chembl E and (pubchem E or pubchem DNE) ==> fill pubchem |
140
|
|
|
# we might therefore go from pubchem --> chembl --> pubchem |
141
|
|
|
# The advantage is that chembl might have a good parent compound |
142
|
|
|
# Whereas pubchem does not |
143
|
|
|
# This is often true: chembl is much better at this than pubchem |
144
|
|
|
# In contrast, only fill ChEMBL if it's missing |
145
|
|
|
if chembl_x is None and pubchem_x is not None: |
146
|
|
|
chembl_x = self._get_chembl(pubchem_x.inchikey, None) |
147
|
|
|
if chembl_x is not None: |
148
|
|
|
pubchem_x = self._get_pubchem(chembl_x.inchikey, None) |
149
|
|
|
################################################################################# |
150
|
|
|
# the order is from best to worst |
151
|
|
|
prioritize_choices = [chembl_x, pubchem_x, fake_x] |
152
|
|
|
db_to_struct = {o.db: o for o in prioritize_choices if o is not None} |
153
|
|
|
inchikey, inchikey_choices = self._choose(db_to_struct, "inchikey") |
154
|
|
|
inchi, inchi_choices = self._choose(db_to_struct, "inchi") |
155
|
|
|
about = " ; ".join([x.simple_str for x in prioritize_choices if x is not None]) |
156
|
|
|
if len(inchikey_choices) == 0: |
157
|
|
|
logger.error(f"no database inchikeys found :: {about}") |
158
|
|
|
elif len(inchikey_choices) > 1: |
159
|
|
|
logger.error(f"inchikey mismatch :: {about} :: {inchikey_choices}") |
160
|
|
|
elif len(inchi_choices) > 1: |
161
|
|
|
logger.debug(f"inchi mismatch :: {about} :: {inchi_choices}") |
162
|
|
|
return dict( |
163
|
|
|
inchi=inchi, |
164
|
|
|
inchikey=inchikey, |
165
|
|
|
chembl_id=look(chembl_x, "id"), |
166
|
|
|
chembl_inchi=look(chembl_x, "inchi"), |
167
|
|
|
chembl_inchikey=look(chembl_x, "inchikey"), |
168
|
|
|
pubchem_id=look(pubchem_x, "id"), |
169
|
|
|
pubchem_inchi=look(pubchem_x, "inchi"), |
170
|
|
|
pubchem_inchikey=look(pubchem_x, "inchikey"), |
171
|
|
|
) |
172
|
|
|
|
173
|
|
|
def _choose( |
|
|
|
|
174
|
|
|
self, |
|
|
|
|
175
|
|
|
db_to_struct: Mapping[str, CompoundStruct], |
|
|
|
|
176
|
|
|
what: str, |
|
|
|
|
177
|
|
|
) -> Tuple[Optional[str], MutableMapping[str, Db]]: |
178
|
|
|
""" |
179
|
|
|
Chooses the best what="inchi" or what="inchikey". |
180
|
|
|
|
181
|
|
|
Arguments: |
182
|
|
|
db_to_struct: Should be in order from most preferred to least |
183
|
|
|
what: The name of the CompoundStruct attribute to access |
184
|
|
|
""" |
185
|
|
|
options = {o.db: look(o, what) for o in db_to_struct.values() if look(o, what) is not None} |
186
|
|
|
_s = ", ".join([f"{k}={v}" for k, v in options.items()]) |
187
|
|
|
non_input_dbs = {v: k for k, v in options.items() if k != "input"} |
188
|
|
|
all_uniques = set(options.values()) |
189
|
|
|
if len(all_uniques) == 0: |
|
|
|
|
190
|
|
|
return None, {} |
191
|
|
|
else: |
192
|
|
|
return list(all_uniques)[0], non_input_dbs |
193
|
|
|
|
194
|
|
|
def _prep(self, df: IdMatchDf) -> IdMatchDf: |
|
|
|
|
195
|
|
|
bad_cols = [c for c in df.columns if c.startswith("origin_")] |
196
|
|
|
if len(bad_cols) > 0: |
197
|
|
|
raise XValueError(f"Columns {', '.join(bad_cols)} start with 'origin_'") |
198
|
|
|
rename_cols = {c: "origin_" + c for c in FILL_IDS if c in df.columns} |
199
|
|
|
if len(rename_cols) > 0: |
200
|
|
|
logger.notice(f"Renaming columns: {', '.join(rename_cols.keys())}") |
201
|
|
|
df: IdMatchDf = df.rename(columns=rename_cols) |
202
|
|
|
drop_cols = {c for c in df.columns if df[c].isnull().all()} |
203
|
|
|
if len(drop_cols): |
|
|
|
|
204
|
|
|
logger.warning(f"Dropping empty columns: {', '.join(drop_cols)}") |
205
|
|
|
df = df.drop_cols(drop_cols) |
206
|
|
|
return df |
207
|
|
|
|
208
|
|
|
def _get_pubchem(self, inchikey: Optional[str], cid: Optional[int]) -> Optional[CompoundStruct]: |
|
|
|
|
209
|
|
|
api = Apis.Pubchem |
210
|
|
|
if cid is not None: |
211
|
|
|
# let it raise a CompoundNotFoundError |
212
|
|
|
inchikey = api.fetch_data(int(cid)).names_and_identifiers.inchikey |
213
|
|
|
if inchikey is None: |
214
|
|
|
return None |
215
|
|
|
if inchikey is not None: |
216
|
|
|
try: |
217
|
|
|
data: Optional[PubchemData] = api.fetch_data(inchikey) |
218
|
|
|
except CompoundNotFoundError: |
219
|
|
|
return None |
220
|
|
|
return None if data is None else data.struct_view |
221
|
|
|
|
222
|
|
|
def _get_chembl(self, inchikey: Optional[str], cid: Optional[str]) -> Optional[CompoundStruct]: |
|
|
|
|
223
|
|
|
util = ChemblUtils(Apis.Chembl) |
224
|
|
|
if cid is not None: |
225
|
|
|
# let it raise a CompoundNotFoundError |
226
|
|
|
return util.get_compound(cid).struct_view |
227
|
|
|
try: |
228
|
|
|
return util.get_compound(inchikey).struct_view |
229
|
|
|
except CompoundNotFoundError: |
230
|
|
|
return None |
231
|
|
|
|
232
|
|
|
|
233
|
|
|
__all__ = ["CompoundIdFiller", "IdMatchDf"] |
234
|
|
|
|