QueryingPubchemApi._strip_by_key_in_place()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 10
nop 3
dl 0
loc 10
rs 8
c 0
b 0
f 0
1
"""
2
PubChem querying API.
3
"""
4
from __future__ import annotations
5
6
import abc
7
import logging
8
import time
9
from urllib.error import HTTPError
10
from datetime import datetime, timezone
11
from pathlib import Path
12
from typing import Optional, Sequence, Union, FrozenSet
13
14
import io
15
import gzip
16
import orjson
0 ignored issues
show
introduced by
Unable to import 'orjson'
Loading history...
17
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
18
from pocketutils.core.dot_dict import NestedDotDict
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.dot_dict'
Loading history...
19
from pocketutils.core.query_utils import QueryExecutor
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.query_utils'
Loading history...
20
21
from mandos import MandosUtils
22
from mandos.model.pubchem_data import PubchemData
23
24
logger = logging.getLogger("mandos")
25
26
27
class PubchemApi(metaclass=abc.ABCMeta):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
28
    def fetch_data_from_cid(self, cid: int) -> Optional[PubchemData]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
29
        # separated from fetch_data to make it completely clear what an int value means
30
        # noinspection PyTypeChecker
31
        return self.fetch_data(cid)
32
33
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
34
        raise NotImplementedError()
35
36
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
37
        raise NotImplementedError()
38
39
40
class QueryingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
41
    def __init__(self):
42
        self._query = QueryExecutor(0.22, 0.25)
43
44
    _pug = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
45
    _pug_view = "https://pubchem.ncbi.nlm.nih.gov/rest/pug_view"
46
    _sdg = "https://pubchem.ncbi.nlm.nih.gov/sdq/sdqagent.cgi"
47
    _classifications = "https://pubchem.ncbi.nlm.nih.gov/classification/cgi/classifications.fcgi"
48
    _link_db = "https://pubchem.ncbi.nlm.nih.gov/link_db/link_db_server.cgi"
49
50
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
51
        data = dict(
52
            meta=dict(
53
                timestamp_fetch_started=datetime.now(timezone.utc).astimezone().isoformat(),
54
                from_lookup=inchikey,
55
            )
56
        )
57
        t0 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
58
        cid = self._fetch_compound(inchikey)
59
        if cid is None:
60
            return None
61
        data["record"] = self._fetch_display_data(cid)["Record"]
62
        external_table_names = {
63
            "related:pubchem:related_compounds_with_annotation": "compound",
64
            "drug:clinicaltrials.gov:clinical_trials": "clinicaltrials",
65
            "pharm:pubchem:reactions": "pathwayreaction",
66
            "uses:cpdat:uses": "cpdat",
67
            "tox:chemidplus:acute_effects": "chemidplus",
68
            "dis:ctd:associated_disorders_and_diseases": "ctd_chemical_disease",
69
            "lit:pubchem:depositor_provided_pubmed_citations": "pubmed",
70
            "patent:depositor_provided_patent_identifiers": "patent",
71
            "bio:rcsb_pdb:protein_bound_3d_structures": "pdb",
72
            "bio:dgidb:drug_gene_interactions": "dgidb",
73
            "bio:ctd:chemical_gene_interactions": "ctdchemicalgene",
74
            "bio:drugbank:drugbank_interactions": "drugbank",
75
            "bio:drugbank:drug_drug_interactions": "drugbankddi",
76
            "bio:pubchem:bioassay_results": "bioactivity",
77
        }
78
        external_link_set_names = {
79
            "lit:pubchem:chemical_cooccurrences_in_literature": "ChemicalNeighbor",
80
            "lit:pubchem:gene_cooccurrences_in_literature": "ChemicalGeneSymbolNeighbor",
81
            "lit:pubchem:disease_cooccurrences_in_literature": "ChemicalDiseaseNeighbor",
82
        }
83
        data["external_tables"] = {
84
            table: self._fetch_external_table(cid, table) for table in external_table_names.values()
85
        }
86
        data["link_sets"] = {
87
            table: self._fetch_external_link_set(cid, table)
88
            for table in external_link_set_names.values()
89
        }
90
        # get index==0 because we only have 1 compound
91
        data["structure"] = self._fetch_misc_data(cid)["PC_Compounds"][0]
92
        del [data["structure"]["props"]]  # redundant with props section in record
93
        data["classifications"] = self._fetch_hierarchies(cid)["hierarchies"]
94
        t1 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t1" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
95
        data["meta"]["timestamp_fetch_finished"] = (
96
            datetime.now(timezone.utc).astimezone().isoformat()
97
        )
98
        data["meta"]["fetch_nanos_taken"] = str(t1 - t0)
99
        self._strip_by_key_in_place(data, "DisplayControls")
100
        return PubchemData(NestedDotDict(data))
101
102
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
103
        slash = self._query_and_type(inchi)
104
        req = self._query(
105
            f"{self._pug}/compound/similarity/{slash}/{inchi}/JSON?Threshold={min_tc}",
106
            method="post",
107
        )
108
        key = orjson.loads(req)["Waiting"]["ListKey"]
109
        t0 = time.monotonic()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
110
        while time.monotonic() - t0 < 5:
111
            # it'll wait as needed here
112
            resp = self._query(f"{self._pug}/compound/listkey/{key}/cids/JSON")
113
            resp = NestedDotDict(orjson.loads(resp))
114
            if resp.get("IdentifierList.CID") is not None:
115
                return frozenset(resp.req_list_as("IdentifierList.CID", int))
116
        raise TimeoutError(f"Search for {inchi} using key {key} timed out")
117
118
    def _fetch_compound(self, inchikey: Union[int, str]) -> Optional[int]:
119
        cid = self._fetch_cid(inchikey)
120
        if cid is None:
121
            return None
122
        data = dict(record=self._fetch_display_data(cid)["Record"])
123
        data = PubchemData(NestedDotDict(data))
124
        return data.parent_or_self
125
126
    def _fetch_cid(self, inchikey: str) -> Optional[int]:
127
        # The PubChem API docs LIE!!
128
        # Using ?cids_type=parent DOES NOT give the parent
129
        # Ex: https://pubchem.ncbi.nlm.nih.gov/compound/656832
130
        # This is cocaine HCl, which has cocaine (446220) as a parent
131
        # https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/656832/JSON
132
        # gives 656832 back again
133
        # same thing when querying by inchikey
134
        slash = self._query_and_type(inchikey)
135
        url = f"{self._pug}/compound/{slash}/JSON"
136
        data = self._query_json(url)
137
        found = [x["id"]["id"] for x in data["PC_Compounds"]]
138
        if len(found) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
139
            return None
140
        elif len(found) > 1:
141
            logger.warning(
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
142
                f"Found {len(found)} CIDs for {inchikey}: {found}. Using first ({found[0]})."
143
            )
144
        found = found[0]["cid"]
145
        assert isinstance(found, int), f"Type of {found} is {type(found)}"
146
        return found
147
148
    def _fetch_display_data(self, cid: int) -> Optional[NestedDotDict]:
149
        url = f"{self._pug_view}/data/compound/{cid}/JSON/?response_type=display"
150
        return self._query_json(url)
151
152
    def _fetch_misc_data(self, cid: int) -> Optional[NestedDotDict]:
153
        url = f"{self._pug}/compound/cid/{cid}/JSON"
154
        return self._query_json(url)
155
156
    def _query_json(self, url: str) -> NestedDotDict:
157
        data = self._query(url)
158
        data = NestedDotDict(orjson.loads(data))
159
        if "Fault" in data:
160
            raise ValueError(f"Request failed ({data.get('Code')}) on {url}: {data.get('Message')}")
161
        return data
162
163
    def _fetch_external_link_set(self, cid: int, table: str) -> NestedDotDict:
164
        url = f"{self._link_db}?format=JSON&type={table}&operation=GetAllLinks&id_1={cid}"
165
        data = self._query(url)
166
        return NestedDotDict(orjson.loads(data))
167
168
    def _fetch_hierarchies(self, cid: int) -> NestedDotDict:
169
        hids = {
170
            "MeSH Tree": 1,
171
            "ChEBI Ontology": 2,
172
            "KEGG: Phytochemical Compounds": 5,
173
            "KEGG: Drug": 14,
174
            "KEGG: USP": 15,
175
            "KEGG: Major components of natural products": 69,
176
            "KEGG: Target-based Classification of Drugs": 22,
177
            "KEGG: OTC drugs": 25,
178
            "KEGG: Drug Classes": 96,
179
            "CAMEO Chemicals": 86,
180
            "WHO ATC Classification System": 79,
181
            "Guide to PHARMACOLOGY Target Classification": 92,
182
            "ChEMBL Target Tree": 87,
183
            "EPA CPDat Classification": 99,
184
            "FDA Pharm Classes": 78,
185
            "ChemIDplus": 84,
186
        }
187
        hids = [1, 2, 5, 69, 79, 84, 99, 1112354]
188
        build_up = []
189
        for hid in hids:
190
            url = f"{self._classifications}?format=json&hid={hid}&search_uid_type=cid&search_uid={cid}&search_type=list&response_type=display"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (142/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
191
            try:
192
                data = orjson.loads(self._query(url))
193
                logger.debug(f"Found data for classifier {hid}, compound {cid}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
194
                data = data["Hierarchies"]["Hierarchy"][0]
195
            except HTTPError:
196
                logger.debug(f"No data for classifier {hid}, compound {cid}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
197
                data = {}
198
            build_up.append(data)
199
        # These list all of the child nodes for each node
200
        # Some of them are > 1000 items -- they're HUGE
201
        # We don't expect to need to navigate to children
202
        self._strip_by_key_in_place(build_up, "ChildID")
203
        return NestedDotDict(dict(hierarchies=build_up))
204
205
    def _fetch_external_table(self, cid: int, table: str) -> Sequence[dict]:
206
        url = self._external_table_url(cid, table)
207
        data = self._query(url)
208
        df: pd.DataFrame = pd.read_csv(io.StringIO(data))
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
209
        return list(df.T.to_dict().values())
210
211
    def _external_table_url(self, cid: int, collection: str) -> str:
212
        return (
213
            self._sdg
214
            + "?infmt=json"
215
            + "&outfmt=csv"
216
            + "&query={ download : * , collection : "
217
            + collection
218
            + " , where :{ ands :[{ cid : "
219
            + str(cid)
220
            + " }]}}"
221
        ).replace(" ", "%22")
222
223
    def _query_and_type(self, inchi: Union[int, str], req_full: bool = False) -> str:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
224
        allowed = ["cid", "inchi", "smiles"] if req_full else ["cid", "inchi", "inchikey", "smiles"]
225
        if isinstance(inchi, int):
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
226
            return f"cid/{inchi}"
227
        else:
228
            query_type = MandosUtils.get_query_type(inchi).name.lower()
229
            if query_type not in allowed:
230
                raise ValueError(f"Can't query {inchi} with type {query_type}")
231
            return f"{query_type}/{inchi}"
232
233
    def _strip_by_key_in_place(self, data: Union[dict, list], bad_key: str) -> None:
234
        if isinstance(data, list):
235
            for x in data:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
236
                self._strip_by_key_in_place(x, bad_key)
237
        elif isinstance(data, dict):
238
            for k, v in list(data.items()):
0 ignored issues
show
Coding Style Naming introduced by
Variable name "v" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
239
                if k == bad_key:
240
                    del data[k]
241
                elif isinstance(v, (list, dict)):
242
                    self._strip_by_key_in_place(v, bad_key)
243
244
245
class CachingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
246
    def __init__(self, cache_dir: Path, querier: QueryingPubchemApi, compress: bool = True):
247
        self._cache_dir = cache_dir
248
        self._querier = querier
249
        self._compress = compress
250
251
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
252
        path = self.data_path(inchikey)
253
        if not path.exists():
254
            data = self._querier.fetch_data(inchikey)
255
            path.parent.mkdir(parents=True, exist_ok=True)
256
            encoded = data.to_json()
257
            self._write_json(encoded, path)
258
            return data
259
        read = self._read_json(path)
260
        return PubchemData(read)
261
262
    def _write_json(self, encoded: str, path: Path) -> None:
263
        if self._compress:
264
            path.write_bytes(gzip.compress(encoded.encode(encoding="utf8")))
265
        else:
266
            path.write_text(encoded, encoding="utf8")
267
268
    def _read_json(self, path: Path) -> NestedDotDict:
269
        if self._compress:
270
            deflated = gzip.decompress(path.read_bytes())
271
            read = orjson.loads(deflated)
272
        else:
273
            read = orjson.loads(path.read_text(encoding="utf8"))
274
        return NestedDotDict(read)
275
276
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
277
        path = self.similarity_path(inchi)
278
        if not path.exists():
279
            df = None
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
280
            existing = set()
281
        else:
282
            df = pd.read_csv(path, sep="\t")
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
283
            df = df[df["min_tc"] < min_tc]
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
284
            existing = set(df["cid"].values)
285
        if len(existing) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
286
            found = self._querier.find_similar_compounds(inchi, min_tc)
287
            path.parent.mkdir(parents=True, exist_ok=True)
288
            new_df = pd.DataFrame([pd.Series(dict(cid=cid, min_tc=min_tc)) for cid in found])
289
            if df is not None:
290
                new_df = pd.concat([df, new_df])
291
            new_df.to_csv(path, sep="\t")
292
            return frozenset(existing.union(found))
293
        else:
294
            return frozenset(existing)
295
296
    def data_path(self, inchikey: str):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
297
        ext = ".json.gz" if self._compress else ".json"
298
        return self._cache_dir / "data" / f"{inchikey}{ext}"
299
300
    def similarity_path(self, inchikey: str):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
301
        ext = ".tab.gz" if self._compress else ".tab"
302
        return self._cache_dir / "similarity" / f"{inchikey}{ext}"
303
304
305
__all__ = [
306
    "PubchemApi",
307
    "CachingPubchemApi",
308
    "QueryingPubchemApi",
309
]
310