Passed
Push — dependabot/pip/flake8-bugbear-... ( 82a4d5...16d864 )
by
unknown
02:18
created

mandos.model.querying_pubchem_api   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 312
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 224
dl 0
loc 312
rs 8.8798
c 0
b 0
f 0
wmc 44

22 Methods

Rating   Name   Duplication   Size   Complexity  
A QueryingPubchemApi._scrape_cid() 0 37 3
A QueryingPubchemApi.find_similar_compounds() 0 14 3
A QueryingPubchemApi._fetch_hierarchy() 0 8 2
A QueryingPubchemApi._fetch_external_linkset() 0 4 1
B QueryingPubchemApi._strip_by_key_in_place() 0 10 7
A QueryingPubchemApi._get_parent() 0 9 3
A QueryingPubchemApi.fetch_data() 0 22 2
A QueryingPubchemApi._fetch_hierarchies() 0 12 3
A QueryingPubchemApi._external_table_url() 0 11 1
A QueryingPubchemApi._fetch_core_data() 0 7 1
A QueryingPubchemApi._linksets_to_use() 0 6 1
A QueryingPubchemApi._fetch_display_data() 0 3 1
A QueryingPubchemApi._fetch_external_table() 0 5 1
A QueryingPubchemApi.__init__() 0 13 2
A QueryingPubchemApi._get_metadata() 0 6 1
A QueryingPubchemApi._fetch_structure_data() 0 7 2
A QueryingPubchemApi._fetch_data() 0 9 1
A QueryingPubchemApi._tables_to_use() 0 24 2
A QueryingPubchemApi._fetch_external_tables() 0 4 1
A QueryingPubchemApi._fetch_external_linksets() 0 4 1
A QueryingPubchemApi._query_json() 0 6 2
A QueryingPubchemApi._hierarchies_to_use() 0 28 3

How to fix   Complexity   

Complexity

Complex classes like mandos.model.querying_pubchem_api often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
PubChem querying API.
3
"""
4
from __future__ import annotations
5
6
import logging
7
import re
8
import time
9
from urllib.error import HTTPError
10
from datetime import datetime, timezone
11
from typing import Optional, Sequence, Union, FrozenSet, Mapping
12
13
import io
14
import orjson
0 ignored issues
show
introduced by
Unable to import 'orjson'
Loading history...
15
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
16
from pocketutils.core.dot_dict import NestedDotDict
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.dot_dict'
Loading history...
17
from pocketutils.core.query_utils import QueryExecutor
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.query_utils'
Loading history...
18
19
from mandos.model.pubchem_api import PubchemCompoundLookupError, PubchemApi
20
from mandos.model.pubchem_support.pubchem_data import PubchemData
21
22
logger = logging.getLogger("mandos")
23
24
25
class QueryingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
26
    def __init__(
0 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
27
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
28
        chem_data: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
29
        extra_tables: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
30
        classifiers: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
31
        extra_classifiers: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
32
        query: Optional[QueryExecutor] = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
33
    ):
34
        self._use_chem_data = chem_data
35
        self._use_extra_tables = extra_tables
36
        self._use_classifiers = classifiers
37
        self._use_extra_classifiers = extra_classifiers
38
        self._query = QueryExecutor(0.22, 0.25) if query is None else query
39
40
    _pug = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
41
    _pug_view = "https://pubchem.ncbi.nlm.nih.gov/rest/pug_view"
42
    _sdg = "https://pubchem.ncbi.nlm.nih.gov/sdq/sdqagent.cgi"
43
    _classifications = "https://pubchem.ncbi.nlm.nih.gov/classification/cgi/classifications.fcgi"
44
    _link_db = "https://pubchem.ncbi.nlm.nih.gov/link_db/link_db_server.cgi"
45
46
    def fetch_data(self, inchikey: str) -> Optional[PubchemData]:
47
        # Dear God this is terrible
48
        # Here are the steps:
49
        # 1. Download HTML for the InChI key and scrape the CID
50
        # 2. Download the "display" JSON data from the CID
51
        # 3. Look for a Parent-type related compound. If it exists, download its display data
52
        # 4. Download the structural data and append it
53
        # 5. Download the external table CSVs and append them
54
        # 6. Download the link sets and append them
55
        # 7. Download the classifiers (hierarchies) and append them
56
        # 8. Attach metadata about how we found this.
57
        # 9. Return the stupid, stupid result as a massive JSON struct.
58
        logger.info(f"Downloading PubChem data for {inchikey}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
59
        cid = self._scrape_cid(inchikey)
60
        try:
61
            data = self._fetch_data(cid, inchikey)
62
        except HTTPError:
63
            raise PubchemCompoundLookupError(
64
                f"Failed finding pubchem compound (JSON) from cid {cid}, inchikey {inchikey}"
65
            )
66
        data = self._get_parent(cid, inchikey, data)
67
        return data
68
69
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
70
        req = self._query(
71
            f"{self._pug}/compound/similarity/inchikey/{inchi}/JSON?Threshold={min_tc}",
72
            method="post",
73
        )
74
        key = orjson.loads(req)["Waiting"]["ListKey"]
75
        t0 = time.monotonic()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
76
        while time.monotonic() - t0 < 5:
77
            # it'll wait as needed here
78
            resp = self._query(f"{self._pug}/compound/listkey/{key}/cids/JSON")
79
            resp = NestedDotDict(orjson.loads(resp))
80
            if resp.get("IdentifierList.CID") is not None:
81
                return frozenset(resp.req_list_as("IdentifierList.CID", int))
82
        raise TimeoutError(f"Search for {inchi} using key {key} timed out")
83
84
    def _scrape_cid(self, inchikey: str) -> int:
85
        # This is awful
86
        # Every attempt to get the actual, correct, unique CID corresponding to the inchikey
87
        # failed with every proper PubChem API
88
        # We can't use <pug_view>/data/compound/<inchikey> -- we can only use a CID there
89
        # I found it with a PUG API
90
        # https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/CID/GJSURZIOUXUGAL-UHFFFAOYSA-N/record/JSON
91
        # But that returns multiple results!!
92
        # There's no apparent way to find out which one is real
93
        # I tried then querying each found CID, getting the display data, and looking at their parents
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (102/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
94
        # Unfortunately, we end up with multiple contradictory parents
95
        # Plus, that's insanely slow -- we have to get the full JSON data for each parent
96
        # Every worse -- the PubChem API docs LIE!!
97
        # Using ?cids_type=parent DOES NOT GIVE THE PARENT compound
98
        # Ex: https://pubchem.ncbi.nlm.nih.gov/compound/656832
99
        # This is cocaine HCl, which has cocaine (446220) as a parent
100
        # https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/656832/JSON
101
        # gives 656832 back again
102
        # same thing when querying by inchikey
103
        # Ultimately, I found that I can get HTML containing the CID from an inchikey
104
        # From there, we'll just have to download its "display" data and get the parent, then download that data
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (112/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
105
        url = f"https://pubchem.ncbi.nlm.nih.gov/compound/{inchikey}"
106
        pat = re.compile(
107
            r'<meta property="og:url" content="https://pubchem\.ncbi\.nlm\.nih\.gov/compound/(\d+)">'
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
108
        )
109
        try:
110
            html = self._query(url)
111
        except HTTPError:
112
            raise PubchemCompoundLookupError(
113
                f"Failed finding pubchem compound (HTML) from inchikey {inchikey} [url: {url}]"
114
            )
115
        match = pat.search(html)
116
        if match is None:
117
            raise PubchemCompoundLookupError(
118
                f"Something is wrong with the HTML from {url}; og:url not found"
119
            )
120
        return int(match.group(1))
121
122
    def _get_parent(self, cid: int, inchikey: str, data: PubchemData) -> PubchemData:
123
        # guard with is not None: we're not caching, so don't do it twice
124
        if data.parent_or_none is None:
125
            return data
126
        try:
127
            return self._fetch_data(data.parent_or_none, inchikey)
128
        except HTTPError:
129
            raise PubchemCompoundLookupError(
130
                f"Failed finding pubchem parent compound (JSON)"
131
                f"for cid {data.parent_or_none}, child cid {cid}, inchikey {inchikey}"
132
            )
133
134
    def _fetch_data(self, cid: int, inchikey: str) -> PubchemData:
135
        when_started = datetime.now(timezone.utc).astimezone()
136
        t0 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
137
        data = self._fetch_core_data(cid)
138
        t1 = time.monotonic_ns()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "t1" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
139
        when_finished = datetime.now(timezone.utc).astimezone()
140
        data["meta"] = self._get_metadata(inchikey, when_started, when_finished, t0, t1)
141
        self._strip_by_key_in_place(data, "DisplayControls")
142
        return PubchemData(NestedDotDict(data))
143
144
    def _fetch_core_data(self, cid: int) -> dict:
145
        return dict(
146
            record=self._fetch_display_data(cid),
147
            structure=self._fetch_structure_data(cid),
148
            external_tables=self._fetch_external_tables(cid),
149
            link_sets=self._fetch_external_linksets(cid),
150
            classifications=self._fetch_hierarchies(cid),
151
        )
152
153
    def _get_metadata(self, inchikey: str, started: datetime, finished: datetime, t0: int, t1: int):
0 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
Coding Style Naming introduced by
Argument name "t1" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style Naming introduced by
Argument name "t0" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
154
        return dict(
155
            timestamp_fetch_started=started.isoformat(),
156
            timestamp_fetch_finished=finished.isoformat(),
157
            from_lookup=inchikey,
158
            fetch_nanos_taken=str(t1 - t0),
159
        )
160
161
    def _fetch_display_data(self, cid: int) -> Optional[NestedDotDict]:
162
        url = f"{self._pug_view}/data/compound/{cid}/JSON/?response_type=display"
163
        return self._query_json(url)["Record"]
164
165
    def _fetch_structure_data(self, cid: int) -> NestedDotDict:
166
        if not self._use_chem_data:
167
            return NestedDotDict({})
168
        url = f"{self._pug}/compound/cid/{cid}/JSON"
169
        data = self._query_json(url)["PC_Compounds"][0]
170
        del [data["structure"]["props"]]  # redundant with props section in record
171
        return data
172
173
    def _fetch_external_tables(self, cid: int) -> Mapping[str, str]:
174
        return {
175
            ext_table: self._fetch_external_table(cid, ext_table)
176
            for ext_table in self._tables_to_use.values()
177
        }
178
179
    def _fetch_external_linksets(self, cid: int) -> Mapping[str, str]:
180
        return {
181
            table: self._fetch_external_linkset(cid, table)
182
            for table in self._linksets_to_use.values()
183
        }
184
185
    def _fetch_hierarchies(self, cid: int) -> NestedDotDict:
186
        build_up = {}
187
        for hname, hid in self._hierarchies_to_use.items():
188
            try:
189
                build_up[hname] = self._fetch_hierarchy(cid, hid)
190
            except (HTTPError, KeyError, LookupError) as e:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
191
                logger.debug(f"No data for classifier {hid}, compound {cid}: {e}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
192
        # These list all of the child nodes for each node
193
        # Some of them are > 1000 items -- they're HUGE
194
        # We don't expect to need to navigate to children
195
        self._strip_by_key_in_place(build_up, "ChildID")
196
        return NestedDotDict(build_up)
197
198
    def _fetch_external_table(self, cid: int, table: str) -> Sequence[dict]:
199
        url = self._external_table_url(cid, table)
200
        data = self._query(url)
201
        df: pd.DataFrame = pd.read_csv(io.StringIO(data))
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
202
        return list(df.T.to_dict().values())
203
204
    def _fetch_external_linkset(self, cid: int, table: str) -> NestedDotDict:
205
        url = f"{self._link_db}?format=JSON&type={table}&operation=GetAllLinks&id_1={cid}"
206
        data = self._query(url)
207
        return NestedDotDict(orjson.loads(data))
208
209
    def _fetch_hierarchy(self, cid: int, hid: int) -> Sequence[dict]:
210
        url = f"{self._classifications}?format=json&hid={hid}&search_uid_type=cid&search_uid={cid}&search_type=list&response_type=display"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (138/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
211
        data: Sequence[dict] = orjson.loads(self._query(url))["Hierarchies"]
212
        # underneath Hierarchies is a list of Hierarchy
213
        logger.debug(f"Found data for classifier {hid}, compound {cid}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
214
        if len(data) == 0:
215
            raise LookupError(f"Failed getting hierarchy {hid}")
216
        return data
217
218
    @property
219
    def _tables_to_use(self) -> Mapping[str, str]:
220
        dct = {
221
            "drug:clinicaltrials.gov:clinical_trials": "clinicaltrials",
222
            "pharm:pubchem:reactions": "pathwayreaction",
223
            "uses:cpdat:uses": "cpdat",
224
            "tox:chemidplus:acute_effects": "chemidplus",
225
            "dis:ctd:associated_disorders_and_diseases": "ctd_chemical_disease",
226
            "lit:pubchem:depositor_provided_pubmed_citations": "pubmed",
227
            "bio:dgidb:drug_gene_interactions": "dgidb",
228
            "bio:ctd:chemical_gene_interactions": "ctdchemicalgene",
229
            "bio:drugbank:drugbank_interactions": "drugbank",
230
            "bio:drugbank:drug_drug_interactions": "drugbankddi",
231
            "bio:pubchem:bioassay_results": "bioactivity",
232
        }
233
        if self._use_extra_tables:
234
            dct.update(
235
                {
236
                    "patent:depositor_provided_patent_identifiers": "patent",
237
                    "bio:rcsb_pdb:protein_bound_3d_structures": "pdb",
238
                    "related:pubchem:related_compounds_with_annotation": "compound",
239
                }
240
            )
241
        return dct
242
243
    @property
244
    def _linksets_to_use(self) -> Mapping[str, str]:
245
        return {
246
            "lit:pubchem:chemical_cooccurrences_in_literature": "ChemicalNeighbor",
247
            "lit:pubchem:gene_cooccurrences_in_literature": "ChemicalGeneSymbolNeighbor",
248
            "lit:pubchem:disease_cooccurrences_in_literature": "ChemicalDiseaseNeighbor",
249
        }
250
251
    @property
252
    def _hierarchies_to_use(self) -> Mapping[str, int]:
253
        if not self._use_classifiers:
254
            return {}
255
        dct = {
256
            "MeSH Tree": 1,
257
            "ChEBI Ontology": 2,
258
            "WHO ATC Classification System": 79,
259
            "Guide to PHARMACOLOGY Target Classification": 92,
260
            "ChEMBL Target Tree": 87,
261
        }
262
        if self._use_extra_classifiers:
263
            dct.update(
264
                {
265
                    "KEGG: Phytochemical Compounds": 5,
266
                    "KEGG: Drug": 14,
267
                    "KEGG: USP": 15,
268
                    "KEGG: Major components of natural products": 69,
269
                    "KEGG: Target-based Classification of Drugs": 22,
270
                    "KEGG: OTC drugs": 25,
271
                    "KEGG: Drug Classes": 96,
272
                    "CAMEO Chemicals": 86,
273
                    "EPA CPDat Classification": 99,
274
                    "FDA Pharm Classes": 78,
275
                    "ChemIDplus": 84,
276
                }
277
            )
278
        return dct
279
280
    def _external_table_url(self, cid: int, collection: str) -> str:
281
        return (
282
            self._sdg
283
            + "?infmt=json"
284
            + "&outfmt=csv"
285
            + "&query={ download : * , collection : "
286
            + collection
287
            + " , where :{ ands :[{ cid : "
288
            + str(cid)
289
            + " }]}}"
290
        ).replace(" ", "%22")
291
292
    def _query_json(self, url: str) -> NestedDotDict:
293
        data = self._query(url)
294
        data = NestedDotDict(orjson.loads(data))
295
        if "Fault" in data:
296
            raise ValueError(f"Request failed ({data.get('Code')}) on {url}: {data.get('Message')}")
297
        return data
298
299
    def _strip_by_key_in_place(self, data: Union[dict, list], bad_key: str) -> None:
300
        if isinstance(data, list):
301
            for x in data:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
302
                self._strip_by_key_in_place(x, bad_key)
303
        elif isinstance(data, dict):
304
            for k, v in list(data.items()):
0 ignored issues
show
Coding Style Naming introduced by
Variable name "v" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
305
                if k == bad_key:
306
                    del data[k]
307
                elif isinstance(v, (list, dict)):
308
                    self._strip_by_key_in_place(v, bad_key)
309
310
311
__all__ = ["QueryingPubchemApi"]
312