Passed
Push — dependabot/pip/pre-commit-2.10... ( b181fa )
by
unknown
06:32 queued 04:47
created

QueryingPubchemApi._fetch_hierarchies()   B

Complexity

Conditions 5

Size

Total Lines 44
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 38
nop 2
dl 0
loc 44
rs 8.5013
c 0
b 0
f 0
1
"""
2
PubChem querying API.
3
"""
4
from __future__ import annotations
5
6
import abc
7
import logging
8
import time
9
from urllib.error import HTTPError
10
from datetime import datetime, timezone
11
from pathlib import Path
12
from typing import Optional, Sequence, Union, FrozenSet
13
14
import io
15
import gzip
16
import orjson
0 ignored issues
show
introduced by
Unable to import 'orjson'
Loading history...
17
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
18
from pocketutils.core.dot_dict import NestedDotDict
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.dot_dict'
Loading history...
19
from pocketutils.core.query_utils import QueryExecutor
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.query_utils'
Loading history...
20
21
from mandos import MandosUtils
22
from mandos.model.pubchem_data import PubchemData
23
24
logger = logging.getLogger("mandos")
25
26
27
class PubchemApi(metaclass=abc.ABCMeta):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
28
    def fetch_data_from_cid(self, cid: int) -> Optional[PubchemData]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
29
        # separated from fetch_data to make it completely clear what an int value means
30
        # noinspection PyTypeChecker
31
        return self.fetch_data(cid)
32
33
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
34
        raise NotImplementedError()
35
36
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
37
        raise NotImplementedError()
38
39
40
class QueryingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
41
    def __init__(self):
42
        self._query = QueryExecutor(0.22, 0.25)
43
44
    _pug = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
45
    _pug_view = "https://pubchem.ncbi.nlm.nih.gov/rest/pug_view"
46
    _sdg = "https://pubchem.ncbi.nlm.nih.gov/sdq/sdqagent.cgi"
47
    _classifications = "https://pubchem.ncbi.nlm.nih.gov/classification/cgi/classifications.fcgi"
48
    _link_db = "https://pubchem.ncbi.nlm.nih.gov/link_db/link_db_server.cgi"
49
50
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
51
        logger.info(f"Downloading PubChem data for {inchikey}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
52
        data = dict(
53
            meta=dict(
54
                timestamp_fetch_started=datetime.now(timezone.utc).astimezone().isoformat(),
55
                from_lookup=inchikey,
56
            )
57
        )
58
        t0 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
59
        cid = self._fetch_compound(inchikey)
60
        if cid is None:
61
            return None
62
        data["record"] = self._fetch_display_data(cid)["Record"]
63
        external_table_names = {
64
            "related:pubchem:related_compounds_with_annotation": "compound",
65
            "drug:clinicaltrials.gov:clinical_trials": "clinicaltrials",
66
            "pharm:pubchem:reactions": "pathwayreaction",
67
            "uses:cpdat:uses": "cpdat",
68
            "tox:chemidplus:acute_effects": "chemidplus",
69
            "dis:ctd:associated_disorders_and_diseases": "ctd_chemical_disease",
70
            "lit:pubchem:depositor_provided_pubmed_citations": "pubmed",
71
            "patent:depositor_provided_patent_identifiers": "patent",
72
            "bio:rcsb_pdb:protein_bound_3d_structures": "pdb",
73
            "bio:dgidb:drug_gene_interactions": "dgidb",
74
            "bio:ctd:chemical_gene_interactions": "ctdchemicalgene",
75
            "bio:drugbank:drugbank_interactions": "drugbank",
76
            "bio:drugbank:drug_drug_interactions": "drugbankddi",
77
            "bio:pubchem:bioassay_results": "bioactivity",
78
        }
79
        external_link_set_names = {
80
            "lit:pubchem:chemical_cooccurrences_in_literature": "ChemicalNeighbor",
81
            "lit:pubchem:gene_cooccurrences_in_literature": "ChemicalGeneSymbolNeighbor",
82
            "lit:pubchem:disease_cooccurrences_in_literature": "ChemicalDiseaseNeighbor",
83
        }
84
        data["external_tables"] = {
85
            table: self._fetch_external_table(cid, table) for table in external_table_names.values()
86
        }
87
        data["link_sets"] = {
88
            table: self._fetch_external_link_set(cid, table)
89
            for table in external_link_set_names.values()
90
        }
91
        # get index==0 because we only have 1 compound
92
        data["structure"] = self._fetch_misc_data(cid)["PC_Compounds"][0]
93
        del [data["structure"]["props"]]  # redundant with props section in record
94
        data["classifications"] = self._fetch_hierarchies(cid)["hierarchies"]
95
        t1 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t1" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
96
        data["meta"]["timestamp_fetch_finished"] = (
97
            datetime.now(timezone.utc).astimezone().isoformat()
98
        )
99
        data["meta"]["fetch_nanos_taken"] = str(t1 - t0)
100
        self._strip_by_key_in_place(data, "DisplayControls")
101
        return PubchemData(NestedDotDict(data))
102
103
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
104
        slash = self._query_and_type(inchi)
105
        req = self._query(
106
            f"{self._pug}/compound/similarity/{slash}/{inchi}/JSON?Threshold={min_tc}",
107
            method="post",
108
        )
109
        key = orjson.loads(req)["Waiting"]["ListKey"]
110
        t0 = time.monotonic()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
111
        while time.monotonic() - t0 < 5:
112
            # it'll wait as needed here
113
            resp = self._query(f"{self._pug}/compound/listkey/{key}/cids/JSON")
114
            resp = NestedDotDict(orjson.loads(resp))
115
            if resp.get("IdentifierList.CID") is not None:
116
                return frozenset(resp.req_list_as("IdentifierList.CID", int))
117
        raise TimeoutError(f"Search for {inchi} using key {key} timed out")
118
119
    def _fetch_compound(self, inchikey: Union[int, str]) -> Optional[int]:
120
        cid = self._fetch_cid(inchikey)
121
        if cid is None:
122
            return None
123
        data = dict(record=self._fetch_display_data(cid)["Record"])
124
        data = PubchemData(NestedDotDict(data))
125
        return data.parent_or_self
126
127
    def _fetch_cid(self, inchikey: str) -> Optional[int]:
128
        # The PubChem API docs LIE!!
129
        # Using ?cids_type=parent DOES NOT give the parent
130
        # Ex: https://pubchem.ncbi.nlm.nih.gov/compound/656832
131
        # This is cocaine HCl, which has cocaine (446220) as a parent
132
        # https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/656832/JSON
133
        # gives 656832 back again
134
        # same thing when querying by inchikey
135
        slash = self._query_and_type(inchikey)
136
        url = f"{self._pug}/compound/{slash}/JSON"
137
        data = self._query_json(url)
138
        logger.error(url)
139
        found = []
140
        for match in data["PC_Compounds"]:
141
            for c in match["props"]:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "c" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
142
                if (
143
                    c["urn"]["label"] == "InChIKey"
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
144
                    and c["urn"]["name"] == "Standard"
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
145
                    and c["value"]["sval"] == inchikey
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
146
                ):
147
                    if match["id"]["id"] not in found:
148
                        found.append(match["id"]["id"])
149
        if len(found) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
150
            return None
151
        elif len(found) > 1:
152
            logger.warning(
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
153
                f"Found {len(found)} CIDs for {inchikey}: {found}. Using first ({found[0]})."
154
            )
155
        found = found[0]["cid"]
156
        assert isinstance(found, int), f"Type of {found} is {type(found)}"
157
        return found
158
159
    def _fetch_display_data(self, cid: int) -> Optional[NestedDotDict]:
160
        url = f"{self._pug_view}/data/compound/{cid}/JSON/?response_type=display"
161
        return self._query_json(url)
162
163
    def _fetch_misc_data(self, cid: int) -> Optional[NestedDotDict]:
164
        url = f"{self._pug}/compound/cid/{cid}/JSON"
165
        return self._query_json(url)
166
167
    def _query_json(self, url: str) -> NestedDotDict:
168
        data = self._query(url)
169
        data = NestedDotDict(orjson.loads(data))
170
        if "Fault" in data:
171
            raise ValueError(f"Request failed ({data.get('Code')}) on {url}: {data.get('Message')}")
172
        return data
173
174
    def _fetch_external_link_set(self, cid: int, table: str) -> NestedDotDict:
175
        url = f"{self._link_db}?format=JSON&type={table}&operation=GetAllLinks&id_1={cid}"
176
        data = self._query(url)
177
        return NestedDotDict(orjson.loads(data))
178
179
    def _fetch_hierarchies(self, cid: int) -> NestedDotDict:
180
        hids = {
181
            "MeSH Tree": 1,
182
            "ChEBI Ontology": 2,
183
            "KEGG: Phytochemical Compounds": 5,
184
            "KEGG: Drug": 14,
185
            "KEGG: USP": 15,
186
            "KEGG: Major components of natural products": 69,
187
            "KEGG: Target-based Classification of Drugs": 22,
188
            "KEGG: OTC drugs": 25,
189
            "KEGG: Drug Classes": 96,
190
            "CAMEO Chemicals": 86,
191
            "WHO ATC Classification System": 79,
192
            "Guide to PHARMACOLOGY Target Classification": 92,
193
            "ChEMBL Target Tree": 87,
194
            "EPA CPDat Classification": 99,
195
            "FDA Pharm Classes": 78,
196
            "ChemIDplus": 84,
197
        }
198
        build_up = []
199
        for hid in hids.values():
200
            url = f"{self._classifications}?format=json&hid={hid}&search_uid_type=cid&search_uid={cid}&search_type=list&response_type=display"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (142/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
201
            try:
202
                data = orjson.loads(self._query(url))
203
                logger.debug(f"Found data for classifier {hid}, compound {cid}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
204
                data = data["Hierarchies"]["Hierarchy"]
205
                if len(data) > 1:
206
                    logger.warning(
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
207
                        f"Multiple hierarchies for classifier {hid}, compound {cid}; using first"
208
                    )
209
                    data = data[0]
210
                elif len(data) == 1:
211
                    data = data[0]
212
                else:
213
                    raise KeyError("Hierarchy")
214
            except (HTTPError, KeyError, LookupError) as e:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
215
                logger.debug(f"No data for classifier {hid}, compound {cid}: {e}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
216
                data = {}
217
            build_up.append(data)
218
        # These list all of the child nodes for each node
219
        # Some of them are > 1000 items -- they're HUGE
220
        # We don't expect to need to navigate to children
221
        self._strip_by_key_in_place(build_up, "ChildID")
222
        return NestedDotDict(dict(hierarchies=build_up))
223
224
    def _fetch_external_table(self, cid: int, table: str) -> Sequence[dict]:
225
        url = self._external_table_url(cid, table)
226
        data = self._query(url)
227
        df: pd.DataFrame = pd.read_csv(io.StringIO(data))
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
228
        return list(df.T.to_dict().values())
229
230
    def _external_table_url(self, cid: int, collection: str) -> str:
231
        return (
232
            self._sdg
233
            + "?infmt=json"
234
            + "&outfmt=csv"
235
            + "&query={ download : * , collection : "
236
            + collection
237
            + " , where :{ ands :[{ cid : "
238
            + str(cid)
239
            + " }]}}"
240
        ).replace(" ", "%22")
241
242
    def _query_and_type(self, inchi: Union[int, str], req_full: bool = False) -> str:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
243
        allowed = ["cid", "inchi", "smiles"] if req_full else ["cid", "inchi", "inchikey", "smiles"]
244
        if isinstance(inchi, int):
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
245
            return f"cid/{inchi}"
246
        else:
247
            query_type = MandosUtils.get_query_type(inchi).name.lower()
248
            if query_type not in allowed:
249
                raise ValueError(f"Can't query {inchi} with type {query_type}")
250
            return f"{query_type}/{inchi}"
251
252
    def _strip_by_key_in_place(self, data: Union[dict, list], bad_key: str) -> None:
253
        if isinstance(data, list):
254
            for x in data:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
255
                self._strip_by_key_in_place(x, bad_key)
256
        elif isinstance(data, dict):
257
            for k, v in list(data.items()):
0 ignored issues
show
Coding Style Naming introduced by
Variable name "v" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
258
                if k == bad_key:
259
                    del data[k]
260
                elif isinstance(v, (list, dict)):
261
                    self._strip_by_key_in_place(v, bad_key)
262
263
264
class CachingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
265
    def __init__(
266
        self, cache_dir: Path, querier: Optional[QueryingPubchemApi], compress: bool = True
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
267
    ):
268
        self._cache_dir = cache_dir
269
        self._querier = querier
270
        self._compress = compress
271
272
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
273
        path = self.data_path(inchikey)
274
        if path.exists():
275
            logger.info(f"Found cached PubChem data at {path.absolute()}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
276
        elif self._querier is None:
277
            raise LookupError(f"Key {inchikey} not found in cache")
278
        else:
279
            logger.info(f"Downloading PubChem data for {inchikey} ...")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
280
            data = self._querier.fetch_data(inchikey)
281
            path.parent.mkdir(parents=True, exist_ok=True)
282
            encoded = data.to_json()
283
            self._write_json(encoded, path)
284
            logger.info(f"Wrote PubChem data to {path.absolute()}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
285
            return data
286
        read = self._read_json(path)
287
        return PubchemData(read)
288
289
    def _write_json(self, encoded: str, path: Path) -> None:
290
        if self._compress:
291
            path.write_bytes(gzip.compress(encoded.encode(encoding="utf8")))
292
        else:
293
            path.write_text(encoded, encoding="utf8")
294
295
    def _read_json(self, path: Path) -> NestedDotDict:
296
        if self._compress:
297
            deflated = gzip.decompress(path.read_bytes())
298
            read = orjson.loads(deflated)
299
        else:
300
            read = orjson.loads(path.read_text(encoding="utf8"))
301
        return NestedDotDict(read)
302
303
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
304
        path = self.similarity_path(inchi)
305
        if not path.exists():
306
            df = None
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
307
            existing = set()
308
        else:
309
            df = pd.read_csv(path, sep="\t")
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
310
            df = df[df["min_tc"] < min_tc]
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
311
            existing = set(df["cid"].values)
312
        if len(existing) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
313
            found = self._querier.find_similar_compounds(inchi, min_tc)
314
            path.parent.mkdir(parents=True, exist_ok=True)
315
            new_df = pd.DataFrame([pd.Series(dict(cid=cid, min_tc=min_tc)) for cid in found])
316
            if df is not None:
317
                new_df = pd.concat([df, new_df])
318
            new_df.to_csv(path, sep="\t")
319
            return frozenset(existing.union(found))
320
        else:
321
            return frozenset(existing)
322
323
    def data_path(self, inchikey: str):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
324
        ext = ".json.gz" if self._compress else ".json"
325
        return self._cache_dir / "data" / f"{inchikey}{ext}"
326
327
    def similarity_path(self, inchikey: str):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
328
        ext = ".tab.gz" if self._compress else ".tab"
329
        return self._cache_dir / "similarity" / f"{inchikey}{ext}"
330
331
332
__all__ = [
333
    "PubchemApi",
334
    "CachingPubchemApi",
335
    "QueryingPubchemApi",
336
]
337