|
1
|
|
|
""" |
|
2
|
|
|
Run searches and write files. |
|
3
|
|
|
""" |
|
4
|
|
|
|
|
5
|
|
|
from __future__ import annotations |
|
6
|
|
|
|
|
7
|
|
|
from dataclasses import dataclass |
|
8
|
|
|
from pathlib import Path |
|
9
|
|
|
from typing import Callable, Optional, Sequence, Mapping, Set |
|
10
|
|
|
|
|
11
|
|
|
import pandas as pd |
|
|
|
|
|
|
12
|
|
|
from pocketutils.core.dot_dict import NestedDotDict |
|
|
|
|
|
|
13
|
|
|
from pocketutils.tools.common_tools import CommonTools |
|
|
|
|
|
|
14
|
|
|
from typeddfs import TypedDfs |
|
|
|
|
|
|
15
|
|
|
|
|
16
|
|
|
from mandos import logger |
|
17
|
|
|
from mandos.entries.api_singletons import Apis |
|
18
|
|
|
from mandos.entries.paths import EntryPaths |
|
19
|
|
|
from mandos.model import CompoundNotFoundError |
|
20
|
|
|
from mandos.model.apis.chembl_support.chembl_utils import ChemblUtils |
|
21
|
|
|
from mandos.model.apis.pubchem_api import PubchemApi |
|
22
|
|
|
from mandos.model.hits import HitFrame |
|
|
|
|
|
|
23
|
|
|
from mandos.model.searches import Search |
|
24
|
|
|
from mandos.search.chembl import ChemblSearch |
|
25
|
|
|
from mandos.search.pubchem import PubchemSearch |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
def _get_structure(df) -> Optional[Sequence[str]]: |
|
|
|
|
|
|
29
|
|
|
if "inchi" in df.columns: |
|
30
|
|
|
return df["inchi"].values |
|
31
|
|
|
if "smiles" in df.columns: |
|
32
|
|
|
return df["smiles"].values |
|
33
|
|
|
return None |
|
34
|
|
|
|
|
35
|
|
|
|
|
36
|
|
|
def _fix_cols(df): |
|
|
|
|
|
|
37
|
|
|
return df.rename(columns={s: s.lower() for s in df.columns}) |
|
38
|
|
|
|
|
39
|
|
|
|
|
40
|
|
|
InputFrame = ( |
|
41
|
|
|
TypedDfs.typed("InputFrame") |
|
42
|
|
|
.require("inchikey") |
|
43
|
|
|
.reserve("inchi", "smiles", "compound_id", dtype=str) |
|
44
|
|
|
.post(_fix_cols) |
|
45
|
|
|
.strict(index=True, cols=False) |
|
46
|
|
|
).build() |
|
47
|
|
|
InputFrame.get_structures = _get_structure |
|
48
|
|
|
|
|
49
|
|
|
|
|
50
|
|
|
IdMatchFrame = ( |
|
51
|
|
|
TypedDfs.typed("IdMatchFrame") |
|
52
|
|
|
.reserve("inchikey", dtype=str) |
|
53
|
|
|
.reserve("inchi", "smiles", "compound_id", dtype=str) |
|
54
|
|
|
.reserve("chembl_id", "pubchem_id", "hmdb_id", dtype=str) |
|
55
|
|
|
.reserve("origin_inchikey", "origin_smiles", dtype=str) |
|
56
|
|
|
.reserve("library", dtype=str) |
|
57
|
|
|
.strict(index=True, cols=False) |
|
58
|
|
|
).build() |
|
59
|
|
|
|
|
60
|
|
|
|
|
61
|
|
|
@dataclass(frozen=True, repr=True) |
|
|
|
|
|
|
62
|
|
|
class ChemFinder: |
|
63
|
|
|
what: str |
|
64
|
|
|
how: Callable[[str], str] |
|
65
|
|
|
|
|
66
|
|
|
@classmethod |
|
67
|
|
|
def chembl(cls) -> ChemFinder: |
|
|
|
|
|
|
68
|
|
|
def how(inchikey: str) -> str: |
|
69
|
|
|
return ChemblUtils(Apis.Chembl).get_compound(inchikey).chid |
|
70
|
|
|
|
|
71
|
|
|
return ChemFinder("ChEMBL", how) |
|
72
|
|
|
|
|
73
|
|
|
@classmethod |
|
74
|
|
|
def pubchem(cls) -> ChemFinder: |
|
|
|
|
|
|
75
|
|
|
def how(inchikey: str) -> str: |
|
76
|
|
|
api: PubchemApi = Apis.Pubchem |
|
77
|
|
|
return str(api.find_id(inchikey)) |
|
78
|
|
|
|
|
79
|
|
|
return ChemFinder("PubChem", how) |
|
80
|
|
|
|
|
81
|
|
|
def find(self, inchikey: str) -> Optional[str]: |
|
|
|
|
|
|
82
|
|
|
try: |
|
83
|
|
|
return self.how(inchikey) |
|
84
|
|
|
except CompoundNotFoundError: |
|
85
|
|
|
logger.info(f"NOT FOUND: {self.what.rjust(8)} ] {inchikey}") |
|
86
|
|
|
logger.debug(f"Did not find {self.what} {inchikey}", exc_info=True) |
|
87
|
|
|
return None |
|
88
|
|
|
|
|
89
|
|
|
|
|
90
|
|
|
class SearcherUtils: |
|
|
|
|
|
|
91
|
|
|
@classmethod |
|
92
|
|
|
def dl( |
|
|
|
|
|
|
93
|
|
|
cls, |
|
|
|
|
|
|
94
|
|
|
inchikeys: Sequence[str], |
|
|
|
|
|
|
95
|
|
|
pubchem: bool = True, |
|
|
|
|
|
|
96
|
|
|
chembl: bool = True, |
|
|
|
|
|
|
97
|
|
|
hmdb: bool = True, |
|
|
|
|
|
|
98
|
|
|
) -> IdMatchFrame: |
|
99
|
|
|
df = IdMatchFrame([pd.Series(dict(inchikey=c)) for c in inchikeys]) |
|
|
|
|
|
|
100
|
|
|
if chembl: |
|
101
|
|
|
df["chembl_id"] = df["inchikey"].map(ChemFinder.chembl().find) |
|
102
|
|
|
if pubchem: |
|
103
|
|
|
df["pubchem_id"] = df["inchikey"].map(ChemFinder.pubchem().find) |
|
104
|
|
|
return df |
|
105
|
|
|
|
|
106
|
|
|
@classmethod |
|
107
|
|
|
def read(cls, input_path: Path) -> InputFrame: |
|
|
|
|
|
|
108
|
|
|
df = InputFrame.read_file(input_path) |
|
|
|
|
|
|
109
|
|
|
logger.info(f"Read {len(df)} input compounds") |
|
110
|
|
|
return df |
|
111
|
|
|
|
|
112
|
|
|
|
|
113
|
|
|
class CompoundIdFiller: |
|
|
|
|
|
|
114
|
|
|
@classmethod |
|
115
|
|
|
def fill( |
|
|
|
|
|
|
116
|
|
|
cls, |
|
|
|
|
|
|
117
|
|
|
df: IdMatchFrame, |
|
|
|
|
|
|
118
|
|
|
) -> IdMatchFrame: |
|
119
|
|
|
matchable = {"inchikey", "pubchem_id", "chembl_id"} |
|
120
|
|
|
sources = {s for s in matchable if s in df.columns and not df[s].isnull().all()} |
|
121
|
|
|
targets = {s for s in matchable if s not in df.columns or df[s].isnull().all()} |
|
122
|
|
|
# noinspection PyUnresolvedReferences |
|
123
|
|
|
logger.notice(f"Copying {sources} to {targets}") |
|
124
|
|
|
source = next(iter(sources)) |
|
125
|
|
|
# watch out! these are simply in order, nothing more |
|
126
|
|
|
remapped = {t: [] for t in targets} |
|
127
|
|
|
for source_val in df[source].values: |
|
128
|
|
|
matches = cls._matches(source, source_val, targets) |
|
129
|
|
|
for target, target_val in matches.items(): |
|
130
|
|
|
remapped[target].append(target_val) |
|
131
|
|
|
remapped.update(matches) |
|
132
|
|
|
for target in targets: |
|
133
|
|
|
df[target] = remapped[target] |
|
134
|
|
|
|
|
135
|
|
|
@classmethod |
|
136
|
|
|
def _matches(cls, source: str, source_val: str, targets: Set[str]) -> Mapping[str, str]: |
|
137
|
|
|
if source == "pubchem_id": |
|
138
|
|
|
inchikey = Apis.Pubchem.find_inchikey(int(source_val)) |
|
139
|
|
|
elif source == "chembl_id": |
|
140
|
|
|
# TODO |
|
|
|
|
|
|
141
|
|
|
# get_compound wants an inchikey, |
|
142
|
|
|
# but we're secretly passing a CHEMBLxxxx ID instead |
|
143
|
|
|
# we just know that that works |
|
144
|
|
|
inchikey = ChemblUtils(Apis.Chembl).get_compound(source_val).inchikey |
|
145
|
|
|
elif source == "inchikey": |
|
146
|
|
|
inchikey = source |
|
147
|
|
|
else: |
|
148
|
|
|
raise AssertionError(source) |
|
149
|
|
|
matched = {} if source == "inchikey" else dict(inchikey=inchikey) |
|
150
|
|
|
if "pubchem_id" in targets: |
|
151
|
|
|
pubchem_id = ChemFinder.pubchem().find(inchikey) |
|
152
|
|
|
if pubchem_id is not None: |
|
153
|
|
|
matched["pubchem_id"] = str(pubchem_id) |
|
154
|
|
|
if "chembl_id" in targets: |
|
155
|
|
|
chembl_id = ChemFinder.chembl().find(inchikey) |
|
156
|
|
|
if chembl_id is not None: |
|
157
|
|
|
matched["chembl_id"] = chembl_id |
|
158
|
|
|
return matched |
|
159
|
|
|
|
|
160
|
|
|
|
|
161
|
|
|
class Searcher: |
|
162
|
|
|
""" |
|
163
|
|
|
Executes one or more searches and saves the results to CSV files. |
|
164
|
|
|
Create and use once. |
|
165
|
|
|
""" |
|
166
|
|
|
|
|
167
|
|
|
def __init__(self, searches: Sequence[Search], to: Sequence[Path], input_path: Path): |
|
168
|
|
|
""" |
|
169
|
|
|
Constructor. |
|
170
|
|
|
|
|
171
|
|
|
Args: |
|
172
|
|
|
searches: |
|
173
|
|
|
input_path: Path to the input file of one of the formats: |
|
174
|
|
|
- .txt containing one InChI Key per line |
|
175
|
|
|
- .csv, .tsv, .tab, csv.gz, .tsv.gz, .tab.gz, or .feather containing a column called inchikey |
|
|
|
|
|
|
176
|
|
|
""" |
|
177
|
|
|
self.what = searches |
|
178
|
|
|
self.input_path: Optional[Path] = input_path |
|
179
|
|
|
self.input_df: InputFrame = None |
|
180
|
|
|
self.output_paths = { |
|
181
|
|
|
what.key: EntryPaths.output_path_of(what, input_path, path) |
|
182
|
|
|
for what, path in CommonTools.zip_list(searches, to) |
|
183
|
|
|
} |
|
184
|
|
|
|
|
185
|
|
|
def search(self) -> Searcher: |
|
186
|
|
|
""" |
|
187
|
|
|
Performs the search, and writes data. |
|
188
|
|
|
""" |
|
189
|
|
|
if self.input_df is not None: |
|
190
|
|
|
raise ValueError(f"Already ran a search") |
|
|
|
|
|
|
191
|
|
|
self.input_df = SearcherUtils.read(self.input_path) |
|
192
|
|
|
inchikeys = self.input_df["inchikey"].unique() |
|
193
|
|
|
has_pubchem = any((isinstance(what, PubchemSearch) for what in self.what)) |
|
|
|
|
|
|
194
|
|
|
has_chembl = any((isinstance(what, ChemblSearch) for what in self.what)) |
|
195
|
|
|
# find the compounds first so the user knows what's missing before proceeding |
|
196
|
|
|
SearcherUtils.dl(inchikeys, pubchem=has_pubchem, chembl=has_chembl) |
|
197
|
|
|
for what in self.what: |
|
198
|
|
|
output_path = self.output_paths[what.key] |
|
199
|
|
|
metadata_path = output_path.with_suffix(".json.metadata") |
|
200
|
|
|
df = what.find_to_df(inchikeys) |
|
|
|
|
|
|
201
|
|
|
# keep all of the original extra columns from the input |
|
202
|
|
|
# e.g. if the user had 'inchi' or 'smiles' or 'pretty_name' |
|
203
|
|
|
for extra_col in [c for c in self.input_df.columns if c != "inchikey"]: |
|
204
|
|
|
extra_mp = self.input_df.set_index("inchikey")[extra_col].to_dict() |
|
205
|
|
|
df[extra_col] = df["lookup"].map(extra_mp.get) |
|
206
|
|
|
# write the (intermediate) file |
|
207
|
|
|
df.write_file(output_path) |
|
208
|
|
|
# write metadata |
|
209
|
|
|
params = {k: str(v) for k, v in what.get_params().items() if k not in {"key", "api"}} |
|
210
|
|
|
metadata = NestedDotDict(dict(key=what.key, search=what.search_class, params=params)) |
|
211
|
|
|
metadata.write_json(metadata_path) |
|
212
|
|
|
logger.info(f"Wrote {what.key} to {output_path}") |
|
213
|
|
|
return self |
|
214
|
|
|
|
|
215
|
|
|
|
|
216
|
|
|
__all__ = ["Searcher", "IdMatchFrame", "SearcherUtils", "CompoundIdFiller", "InputFrame"] |
|
217
|
|
|
|