1
|
|
|
""" |
2
|
|
|
Run searches and write files. |
3
|
|
|
""" |
4
|
|
|
|
5
|
|
|
from __future__ import annotations |
6
|
|
|
|
7
|
|
|
from dataclasses import dataclass |
8
|
|
|
from pathlib import Path |
9
|
|
|
from typing import Callable, Optional, Sequence, Mapping, Set |
10
|
|
|
|
11
|
|
|
import pandas as pd |
|
|
|
|
12
|
|
|
from pocketutils.core.dot_dict import NestedDotDict |
|
|
|
|
13
|
|
|
from pocketutils.tools.common_tools import CommonTools |
|
|
|
|
14
|
|
|
from typeddfs import TypedDfs |
|
|
|
|
15
|
|
|
|
16
|
|
|
from mandos import logger |
17
|
|
|
from mandos.entries.api_singletons import Apis |
18
|
|
|
from mandos.entries.paths import EntryPaths |
19
|
|
|
from mandos.model import CompoundNotFoundError |
20
|
|
|
from mandos.model.apis.chembl_support.chembl_utils import ChemblUtils |
21
|
|
|
from mandos.model.apis.pubchem_api import PubchemApi |
22
|
|
|
from mandos.model.hits import HitFrame |
|
|
|
|
23
|
|
|
from mandos.model.searches import Search |
24
|
|
|
from mandos.search.chembl import ChemblSearch |
25
|
|
|
from mandos.search.pubchem import PubchemSearch |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
def _get_structure(df) -> Optional[Sequence[str]]: |
|
|
|
|
29
|
|
|
if "inchi" in df.columns: |
30
|
|
|
return df["inchi"].values |
31
|
|
|
if "smiles" in df.columns: |
32
|
|
|
return df["smiles"].values |
33
|
|
|
return None |
34
|
|
|
|
35
|
|
|
|
36
|
|
|
def _fix_cols(df): |
|
|
|
|
37
|
|
|
return df.rename(columns={s: s.lower() for s in df.columns}) |
38
|
|
|
|
39
|
|
|
|
40
|
|
|
InputFrame = ( |
41
|
|
|
TypedDfs.typed("InputFrame") |
42
|
|
|
.require("inchikey") |
43
|
|
|
.reserve("inchi", "smiles", "compound_id", dtype=str) |
44
|
|
|
.post(_fix_cols) |
45
|
|
|
.strict(index=True, cols=False) |
46
|
|
|
).build() |
47
|
|
|
InputFrame.get_structures = _get_structure |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
IdMatchFrame = ( |
51
|
|
|
TypedDfs.typed("IdMatchFrame") |
52
|
|
|
.reserve("inchikey", dtype=str) |
53
|
|
|
.reserve("inchi", "smiles", "compound_id", dtype=str) |
54
|
|
|
.reserve("chembl_id", "pubchem_id", "hmdb_id", dtype=str) |
55
|
|
|
.reserve("origin_inchikey", "origin_smiles", dtype=str) |
56
|
|
|
.reserve("library", dtype=str) |
57
|
|
|
.strict(index=True, cols=False) |
58
|
|
|
).build() |
59
|
|
|
|
60
|
|
|
|
61
|
|
|
@dataclass(frozen=True, repr=True) |
|
|
|
|
62
|
|
|
class ChemFinder: |
63
|
|
|
what: str |
64
|
|
|
how: Callable[[str], str] |
65
|
|
|
|
66
|
|
|
@classmethod |
67
|
|
|
def chembl(cls) -> ChemFinder: |
|
|
|
|
68
|
|
|
def how(inchikey: str) -> str: |
69
|
|
|
return ChemblUtils(Apis.Chembl).get_compound(inchikey).chid |
70
|
|
|
|
71
|
|
|
return ChemFinder("ChEMBL", how) |
72
|
|
|
|
73
|
|
|
@classmethod |
74
|
|
|
def pubchem(cls) -> ChemFinder: |
|
|
|
|
75
|
|
|
def how(inchikey: str) -> str: |
76
|
|
|
api: PubchemApi = Apis.Pubchem |
77
|
|
|
return str(api.find_id(inchikey)) |
78
|
|
|
|
79
|
|
|
return ChemFinder("PubChem", how) |
80
|
|
|
|
81
|
|
|
def find(self, inchikey: str) -> Optional[str]: |
|
|
|
|
82
|
|
|
try: |
83
|
|
|
return self.how(inchikey) |
84
|
|
|
except CompoundNotFoundError: |
85
|
|
|
logger.info(f"NOT FOUND: {self.what.rjust(8)} ] {inchikey}") |
86
|
|
|
logger.debug(f"Did not find {self.what} {inchikey}", exc_info=True) |
87
|
|
|
return None |
88
|
|
|
|
89
|
|
|
|
90
|
|
|
class SearcherUtils: |
|
|
|
|
91
|
|
|
@classmethod |
92
|
|
|
def dl( |
|
|
|
|
93
|
|
|
cls, |
|
|
|
|
94
|
|
|
inchikeys: Sequence[str], |
|
|
|
|
95
|
|
|
pubchem: bool = True, |
|
|
|
|
96
|
|
|
chembl: bool = True, |
|
|
|
|
97
|
|
|
hmdb: bool = True, |
|
|
|
|
98
|
|
|
) -> IdMatchFrame: |
99
|
|
|
df = IdMatchFrame([pd.Series(dict(inchikey=c)) for c in inchikeys]) |
|
|
|
|
100
|
|
|
if chembl: |
101
|
|
|
df["chembl_id"] = df["inchikey"].map(ChemFinder.chembl().find) |
102
|
|
|
if pubchem: |
103
|
|
|
df["pubchem_id"] = df["inchikey"].map(ChemFinder.pubchem().find) |
104
|
|
|
return df |
105
|
|
|
|
106
|
|
|
@classmethod |
107
|
|
|
def read(cls, input_path: Path) -> InputFrame: |
|
|
|
|
108
|
|
|
df = InputFrame.read_file(input_path) |
|
|
|
|
109
|
|
|
logger.info(f"Read {len(df)} input compounds") |
110
|
|
|
return df |
111
|
|
|
|
112
|
|
|
|
113
|
|
|
class CompoundIdFiller: |
|
|
|
|
114
|
|
|
@classmethod |
115
|
|
|
def fill( |
|
|
|
|
116
|
|
|
cls, |
|
|
|
|
117
|
|
|
df: IdMatchFrame, |
|
|
|
|
118
|
|
|
) -> IdMatchFrame: |
119
|
|
|
matchable = {"inchikey", "pubchem_id", "chembl_id"} |
120
|
|
|
sources = {s for s in matchable if s in df.columns and not df[s].isnull().all()} |
121
|
|
|
targets = {s for s in matchable if s not in df.columns or df[s].isnull().all()} |
122
|
|
|
# noinspection PyUnresolvedReferences |
123
|
|
|
logger.notice(f"Copying {sources} to {targets}") |
124
|
|
|
source = next(iter(sources)) |
125
|
|
|
# watch out! these are simply in order, nothing more |
126
|
|
|
remapped = {t: [] for t in targets} |
127
|
|
|
for source_val in df[source].values: |
128
|
|
|
matches = cls._matches(source, source_val, targets) |
129
|
|
|
for target, target_val in matches.items(): |
130
|
|
|
remapped[target].append(target_val) |
131
|
|
|
remapped.update(matches) |
132
|
|
|
for target in targets: |
133
|
|
|
df[target] = remapped[target] |
134
|
|
|
|
135
|
|
|
@classmethod |
136
|
|
|
def _matches(cls, source: str, source_val: str, targets: Set[str]) -> Mapping[str, str]: |
137
|
|
|
if source == "pubchem_id": |
138
|
|
|
inchikey = Apis.Pubchem.find_inchikey(int(source_val)) |
139
|
|
|
elif source == "chembl_id": |
140
|
|
|
# TODO |
|
|
|
|
141
|
|
|
# get_compound wants an inchikey, |
142
|
|
|
# but we're secretly passing a CHEMBLxxxx ID instead |
143
|
|
|
# we just know that that works |
144
|
|
|
inchikey = ChemblUtils(Apis.Chembl).get_compound(source_val).inchikey |
145
|
|
|
elif source == "inchikey": |
146
|
|
|
inchikey = source |
147
|
|
|
else: |
148
|
|
|
raise AssertionError(source) |
149
|
|
|
matched = {} if source == "inchikey" else dict(inchikey=inchikey) |
150
|
|
|
if "pubchem_id" in targets: |
151
|
|
|
pubchem_id = ChemFinder.pubchem().find(inchikey) |
152
|
|
|
if pubchem_id is not None: |
153
|
|
|
matched["pubchem_id"] = str(pubchem_id) |
154
|
|
|
if "chembl_id" in targets: |
155
|
|
|
chembl_id = ChemFinder.chembl().find(inchikey) |
156
|
|
|
if chembl_id is not None: |
157
|
|
|
matched["chembl_id"] = chembl_id |
158
|
|
|
return matched |
159
|
|
|
|
160
|
|
|
|
161
|
|
|
class Searcher: |
162
|
|
|
""" |
163
|
|
|
Executes one or more searches and saves the results to CSV files. |
164
|
|
|
Create and use once. |
165
|
|
|
""" |
166
|
|
|
|
167
|
|
|
def __init__(self, searches: Sequence[Search], to: Sequence[Path], input_path: Path): |
168
|
|
|
""" |
169
|
|
|
Constructor. |
170
|
|
|
|
171
|
|
|
Args: |
172
|
|
|
searches: |
173
|
|
|
input_path: Path to the input file of one of the formats: |
174
|
|
|
- .txt containing one InChI Key per line |
175
|
|
|
- .csv, .tsv, .tab, csv.gz, .tsv.gz, .tab.gz, or .feather containing a column called inchikey |
|
|
|
|
176
|
|
|
""" |
177
|
|
|
self.what = searches |
178
|
|
|
self.input_path: Optional[Path] = input_path |
179
|
|
|
self.input_df: InputFrame = None |
180
|
|
|
self.output_paths = { |
181
|
|
|
what.key: EntryPaths.output_path_of(what, input_path, path) |
182
|
|
|
for what, path in CommonTools.zip_list(searches, to) |
183
|
|
|
} |
184
|
|
|
|
185
|
|
|
def search(self) -> Searcher: |
186
|
|
|
""" |
187
|
|
|
Performs the search, and writes data. |
188
|
|
|
""" |
189
|
|
|
if self.input_df is not None: |
190
|
|
|
raise ValueError(f"Already ran a search") |
|
|
|
|
191
|
|
|
self.input_df = SearcherUtils.read(self.input_path) |
192
|
|
|
inchikeys = self.input_df["inchikey"].unique() |
193
|
|
|
has_pubchem = any((isinstance(what, PubchemSearch) for what in self.what)) |
|
|
|
|
194
|
|
|
has_chembl = any((isinstance(what, ChemblSearch) for what in self.what)) |
195
|
|
|
# find the compounds first so the user knows what's missing before proceeding |
196
|
|
|
SearcherUtils.dl(inchikeys, pubchem=has_pubchem, chembl=has_chembl) |
197
|
|
|
for what in self.what: |
198
|
|
|
output_path = self.output_paths[what.key] |
199
|
|
|
metadata_path = output_path.with_suffix(".json.metadata") |
200
|
|
|
df = what.find_to_df(inchikeys) |
|
|
|
|
201
|
|
|
# keep all of the original extra columns from the input |
202
|
|
|
# e.g. if the user had 'inchi' or 'smiles' or 'pretty_name' |
203
|
|
|
for extra_col in [c for c in self.input_df.columns if c != "inchikey"]: |
204
|
|
|
extra_mp = self.input_df.set_index("inchikey")[extra_col].to_dict() |
205
|
|
|
df[extra_col] = df["lookup"].map(extra_mp.get) |
206
|
|
|
# write the (intermediate) file |
207
|
|
|
df.write_file(output_path) |
208
|
|
|
# write metadata |
209
|
|
|
params = {k: str(v) for k, v in what.get_params().items() if k not in {"key", "api"}} |
210
|
|
|
metadata = NestedDotDict(dict(key=what.key, search=what.search_class, params=params)) |
211
|
|
|
metadata.write_json(metadata_path) |
212
|
|
|
logger.info(f"Wrote {what.key} to {output_path}") |
213
|
|
|
return self |
214
|
|
|
|
215
|
|
|
|
216
|
|
|
__all__ = ["Searcher", "IdMatchFrame", "SearcherUtils", "CompoundIdFiller", "InputFrame"] |
217
|
|
|
|