Passed
Push — main ( ddff4b...7b3fbc )
by Douglas
04:33
created

QueryingPubchemApi._get_parent()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 4
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
"""
2
PubChem querying API.
3
"""
4
from __future__ import annotations
5
6
import re
7
import time
8
from urllib.error import HTTPError
9
from datetime import datetime, timezone
10
from typing import Optional, Sequence, Union, FrozenSet, Mapping
11
12
import io
13
import orjson
0 ignored issues
show
introduced by
Unable to import 'orjson'
Loading history...
14
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
15
from pocketutils.core.dot_dict import NestedDotDict
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.dot_dict'
Loading history...
16
from pocketutils.core.query_utils import QueryExecutor
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.query_utils'
Loading history...
17
18
from mandos import logger
19
from mandos.model.apis.pubchem_api import PubchemCompoundLookupError, PubchemApi
20
from mandos.model.apis.pubchem_support.pubchem_data import PubchemData
21
22
23
class QueryingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
24
    def __init__(
0 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
25
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
26
        chem_data: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
27
        extra_tables: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
28
        classifiers: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
29
        extra_classifiers: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
30
        query: Optional[QueryExecutor] = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
31
    ):
32
        self._use_chem_data = chem_data
33
        self._use_extra_tables = extra_tables
34
        self._use_classifiers = classifiers
35
        self._use_extra_classifiers = extra_classifiers
36
        self._query = QueryExecutor(0.22, 0.25) if query is None else query
37
38
    _pug = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
39
    _pug_view = "https://pubchem.ncbi.nlm.nih.gov/rest/pug_view"
40
    _sdg = "https://pubchem.ncbi.nlm.nih.gov/sdq/sdqagent.cgi"
41
    _classifications = "https://pubchem.ncbi.nlm.nih.gov/classification/cgi/classifications.fcgi"
42
    _link_db = "https://pubchem.ncbi.nlm.nih.gov/link_db/link_db_server.cgi"
43
44
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
45
        # Dear God this is terrible
46
        # Here are the steps:
47
        # 1. Download HTML for the InChI key and scrape the CID
48
        # 2. Download the "display" JSON data from the CID
49
        # 3. Look for a Parent-type related compound. If it exists, download its display data
50
        # 4. Download the structural data and append it
51
        # 5. Download the external table CSVs and append them
52
        # 6. Download the link sets and append them
53
        # 7. Download the classifiers (hierarchies) and append them
54
        # 8. Attach metadata about how we found this.
55
        # 9. Return the stupid, stupid result as a massive JSON struct.
56
        logger.info(f"Downloading PubChem data for {inchikey}")
57
        cid = self._scrape_cid(inchikey)
58
        try:
59
            data = self._fetch_data(cid, inchikey)
60
        except HTTPError:
61
            raise PubchemCompoundLookupError(
62
                f"Failed finding pubchem compound (JSON) from cid {cid}, inchikey {inchikey}"
63
            )
64
        data = self._get_parent(cid, inchikey, data)
65
        return data
66
67
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
68
        req = self._query(
69
            f"{self._pug}/compound/similarity/inchikey/{inchi}/JSON?Threshold={min_tc}",
70
            method="post",
71
        )
72
        key = orjson.loads(req)["Waiting"]["ListKey"]
73
        t0 = time.monotonic()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
74
        while time.monotonic() - t0 < 5:
75
            # it'll wait as needed here
76
            resp = self._query(f"{self._pug}/compound/listkey/{key}/cids/JSON")
77
            resp = NestedDotDict(orjson.loads(resp))
78
            if resp.get("IdentifierList.CID") is not None:
79
                return frozenset(resp.req_list_as("IdentifierList.CID", int))
80
        raise TimeoutError(f"Search for {inchi} using key {key} timed out")
81
82
    def _scrape_cid(self, inchikey: str) -> int:
83
        # This is awful
84
        # Every attempt to get the actual, correct, unique CID corresponding to the inchikey
85
        # failed with every proper PubChem API
86
        # We can't use <pug_view>/data/compound/<inchikey> -- we can only use a CID there
87
        # I found it with a PUG API
88
        # https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/CID/GJSURZIOUXUGAL-UHFFFAOYSA-N/record/JSON
89
        # But that returns multiple results!!
90
        # There's no apparent way to find out which one is real
91
        # I tried then querying each found CID, getting the display data, and looking at their parents
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (102/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
92
        # Unfortunately, we end up with multiple contradictory parents
93
        # Plus, that's insanely slow -- we have to get the full JSON data for each parent
94
        # Every worse -- the PubChem API docs LIE!!
95
        # Using ?cids_type=parent DOES NOT GIVE THE PARENT compound
96
        # Ex: https://pubchem.ncbi.nlm.nih.gov/compound/656832
97
        # This is cocaine HCl, which has cocaine (446220) as a parent
98
        # https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/656832/JSON
99
        # gives 656832 back again
100
        # same thing when querying by inchikey
101
        # Ultimately, I found that I can get HTML containing the CID from an inchikey
102
        # From there, we'll just have to download its "display" data and get the parent, then download that data
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (112/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
103
        url = f"https://pubchem.ncbi.nlm.nih.gov/compound/{inchikey}"
104
        pat = re.compile(
105
            r'<meta property="og:url" content="https://pubchem\.ncbi\.nlm\.nih\.gov/compound/(\d+)">'
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
106
        )
107
        try:
108
            html = self._query(url)
109
        except HTTPError:
110
            raise PubchemCompoundLookupError(
111
                f"Failed finding pubchem compound (HTML) from inchikey {inchikey} [url: {url}]"
112
            )
113
        match = pat.search(html)
114
        if match is None:
115
            raise PubchemCompoundLookupError(
116
                f"Something is wrong with the HTML from {url}; og:url not found"
117
            )
118
        return int(match.group(1))
119
120
    def _get_parent(self, cid: int, inchikey: str, data: PubchemData) -> PubchemData:
121
        # guard with is not None: we're not caching, so don't do it twice
122
        if data.parent_or_none is None:
123
            return data
124
        try:
125
            return self._fetch_data(data.parent_or_none, inchikey)
126
        except HTTPError:
127
            raise PubchemCompoundLookupError(
128
                f"Failed finding pubchem parent compound (JSON)"
129
                f"for cid {data.parent_or_none}, child cid {cid}, inchikey {inchikey}"
130
            )
131
132
    def _fetch_data(self, cid: int, inchikey: str) -> PubchemData:
133
        when_started = datetime.now(timezone.utc).astimezone()
134
        t0 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
135
        data = self._fetch_core_data(cid)
136
        t1 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t1" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
137
        when_finished = datetime.now(timezone.utc).astimezone()
138
        data["meta"] = self._get_metadata(inchikey, when_started, when_finished, t0, t1)
139
        self._strip_by_key_in_place(data, "DisplayControls")
140
        return PubchemData(NestedDotDict(data))
141
142
    def _fetch_core_data(self, cid: int) -> dict:
143
        return dict(
144
            record=self._fetch_display_data(cid),
145
            structure=self._fetch_structure_data(cid),
146
            external_tables=self._fetch_external_tables(cid),
147
            link_sets=self._fetch_external_linksets(cid),
148
            classifications=self._fetch_hierarchies(cid),
149
        )
150
151
    def _get_metadata(self, inchikey: str, started: datetime, finished: datetime, t0: int, t1: int):
0 ignored issues
show
Coding Style Naming introduced by
Argument name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style Naming introduced by
Argument name "t1" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
best-practice introduced by
Too many arguments (6/5)
Loading history...
152
        return dict(
153
            timestamp_fetch_started=started.isoformat(),
154
            timestamp_fetch_finished=finished.isoformat(),
155
            from_lookup=inchikey,
156
            fetch_nanos_taken=str(t1 - t0),
157
        )
158
159
    def _fetch_display_data(self, cid: int) -> Optional[NestedDotDict]:
160
        url = f"{self._pug_view}/data/compound/{cid}/JSON/?response_type=display"
161
        return self._query_json(url)["Record"]
162
163
    def _fetch_structure_data(self, cid: int) -> NestedDotDict:
164
        if not self._use_chem_data:
165
            return NestedDotDict({})
166
        url = f"{self._pug}/compound/cid/{cid}/JSON"
167
        data = self._query_json(url)["PC_Compounds"][0]
168
        del [data["structure"]["props"]]  # redundant with props section in record
169
        return data
170
171
    def _fetch_external_tables(self, cid: int) -> Mapping[str, str]:
172
        return {
173
            ext_table: self._fetch_external_table(cid, ext_table)
174
            for ext_table in self._tables_to_use.values()
175
        }
176
177
    def _fetch_external_linksets(self, cid: int) -> Mapping[str, str]:
178
        return {
179
            table: self._fetch_external_linkset(cid, table)
180
            for table in self._linksets_to_use.values()
181
        }
182
183
    def _fetch_hierarchies(self, cid: int) -> NestedDotDict:
184
        build_up = {}
185
        for hname, hid in self._hierarchies_to_use.items():
186
            try:
187
                build_up[hname] = self._fetch_hierarchy(cid, hid)
188
            except (HTTPError, KeyError, LookupError) as e:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
189
                logger.debug(f"No data for classifier {hid}, compound {cid}: {e}")
190
        # These list all of the child nodes for each node
191
        # Some of them are > 1000 items -- they're HUGE
192
        # We don't expect to need to navigate to children
193
        self._strip_by_key_in_place(build_up, "ChildID")
194
        return NestedDotDict(build_up)
195
196
    def _fetch_external_table(self, cid: int, table: str) -> Sequence[dict]:
197
        url = self._external_table_url(cid, table)
198
        data = self._query(url)
199
        df: pd.DataFrame = pd.read_csv(io.StringIO(data))
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
200
        return list(df.T.to_dict().values())
201
202
    def _fetch_external_linkset(self, cid: int, table: str) -> NestedDotDict:
203
        url = f"{self._link_db}?format=JSON&type={table}&operation=GetAllLinks&id_1={cid}"
204
        data = self._query(url)
205
        return NestedDotDict(orjson.loads(data))
206
207
    def _fetch_hierarchy(self, cid: int, hid: int) -> Sequence[dict]:
208
        url = f"{self._classifications}?format=json&hid={hid}&search_uid_type=cid&search_uid={cid}&search_type=list&response_type=display"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (138/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
209
        data: Sequence[dict] = orjson.loads(self._query(url))["Hierarchies"]
210
        # underneath Hierarchies is a list of Hierarchy
211
        logger.debug(f"Found data for classifier {hid}, compound {cid}")
212
        if len(data) == 0:
213
            raise LookupError(f"Failed getting hierarchy {hid}")
214
        return data
215
216
    @property
217
    def _tables_to_use(self) -> Mapping[str, str]:
218
        dct = {
219
            "drug:clinicaltrials.gov:clinical_trials": "clinicaltrials",
220
            "pharm:pubchem:reactions": "pathwayreaction",
221
            "uses:cpdat:uses": "cpdat",
222
            "tox:chemidplus:acute_effects": "chemidplus",
223
            "dis:ctd:associated_disorders_and_diseases": "ctd_chemical_disease",
224
            "lit:pubchem:depositor_provided_pubmed_citations": "pubmed",
225
            "bio:dgidb:drug_gene_interactions": "dgidb",
226
            "bio:ctd:chemical_gene_interactions": "ctdchemicalgene",
227
            "bio:drugbank:drugbank_interactions": "drugbank",
228
            "bio:drugbank:drug_drug_interactions": "drugbankddi",
229
            "bio:pubchem:bioassay_results": "bioactivity",
230
        }
231
        if self._use_extra_tables:
232
            dct.update(
233
                {
234
                    "patent:depositor_provided_patent_identifiers": "patent",
235
                    "bio:rcsb_pdb:protein_bound_3d_structures": "pdb",
236
                    "related:pubchem:related_compounds_with_annotation": "compound",
237
                }
238
            )
239
        return dct
240
241
    @property
242
    def _linksets_to_use(self) -> Mapping[str, str]:
243
        return {
244
            "lit:pubchem:chemical_cooccurrences_in_literature": "ChemicalNeighbor",
245
            "lit:pubchem:gene_cooccurrences_in_literature": "ChemicalGeneSymbolNeighbor",
246
            "lit:pubchem:disease_cooccurrences_in_literature": "ChemicalDiseaseNeighbor",
247
        }
248
249
    @property
250
    def _hierarchies_to_use(self) -> Mapping[str, int]:
251
        if not self._use_classifiers:
252
            return {}
253
        dct = {
254
            "MeSH Tree": 1,
255
            "ChEBI Ontology": 2,
256
            "WHO ATC Classification System": 79,
257
            "Guide to PHARMACOLOGY Target Classification": 92,
258
            "ChEMBL Target Tree": 87,
259
        }
260
        if self._use_extra_classifiers:
261
            dct.update(
262
                {
263
                    "KEGG: Phytochemical Compounds": 5,
264
                    "KEGG: Drug": 14,
265
                    "KEGG: USP": 15,
266
                    "KEGG: Major components of natural products": 69,
267
                    "KEGG: Target-based Classification of Drugs": 22,
268
                    "KEGG: OTC drugs": 25,
269
                    "KEGG: Drug Classes": 96,
270
                    "CAMEO Chemicals": 86,
271
                    "EPA CPDat Classification": 99,
272
                    "FDA Pharm Classes": 78,
273
                    "ChemIDplus": 84,
274
                }
275
            )
276
        return dct
277
278
    def _external_table_url(self, cid: int, collection: str) -> str:
279
        return (
280
            self._sdg
281
            + "?infmt=json"
282
            + "&outfmt=csv"
283
            + "&query={ download : * , collection : "
284
            + collection
285
            + " , where :{ ands :[{ cid : "
286
            + str(cid)
287
            + " }]}}"
288
        ).replace(" ", "%22")
289
290
    def _query_json(self, url: str) -> NestedDotDict:
291
        data = self._query(url)
292
        data = NestedDotDict(orjson.loads(data))
293
        if "Fault" in data:
294
            raise ValueError(f"Request failed ({data.get('Code')}) on {url}: {data.get('Message')}")
295
        return data
296
297
    def _strip_by_key_in_place(self, data: Union[dict, list], bad_key: str) -> None:
298
        if isinstance(data, list):
299
            for x in data:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
300
                self._strip_by_key_in_place(x, bad_key)
301
        elif isinstance(data, dict):
302
            for k, v in list(data.items()):
0 ignored issues
show
Coding Style Naming introduced by
Variable name "v" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
303
                if k == bad_key:
304
                    del data[k]
305
                elif isinstance(v, (list, dict)):
306
                    self._strip_by_key_in_place(v, bad_key)
307
308
309
__all__ = ["QueryingPubchemApi"]
310