Passed
Push — main ( 9813db...5006f2 )
by Douglas
01:43
created

CachingPubchemApi.fetch_data()   C

Complexity

Conditions 10

Size

Total Lines 34
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 32
nop 2
dl 0
loc 34
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like mandos.model.apis.caching_pubchem_api.CachingPubchemApi.fetch_data() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
PubChem querying API.
3
"""
4
from __future__ import annotations
5
6
import gzip
7
from pathlib import Path
8
from typing import FrozenSet, Optional, Union, Sequence
9
10
import orjson
0 ignored issues
show
introduced by
Unable to import 'orjson'
Loading history...
11
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
12
from pocketutils.core.dot_dict import NestedDotDict
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.dot_dict'
Loading history...
13
from pocketutils.core.exceptions import IllegalStateError
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.exceptions'
Loading history...
Unused Code introduced by
Unused IllegalStateError imported from pocketutils.core.exceptions
Loading history...
14
15
from mandos import logger, MANDOS_SETTINGS
16
from mandos.model.apis.pubchem_api import PubchemApi, PubchemCompoundLookupError
17
from mandos.model.apis.pubchem_support.pubchem_data import PubchemData
18
from mandos.model.apis.querying_pubchem_api import QueryingPubchemApi
19
20
21
class CachingPubchemApi(PubchemApi):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
22
    def __init__(
23
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
24
        query: Optional[QueryingPubchemApi],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
25
        cache_dir: Path = MANDOS_SETTINGS.pubchem_cache_path,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
26
    ):
27
        self._cache_dir = cache_dir
28
        self._query = query
29
        self._add_all_cids()
30
31
    def find_id(self, inchikey: str) -> Optional[int]:
0 ignored issues
show
Unused Code introduced by
Either all return statements in a function should return an expression, or none of them should.
Loading history...
32
        if self.similarity_path(inchikey).exists():
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
33
            x = self.fetch_data(inchikey)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
34
            return None if x is None else x.cid
35
        elif self._query is not None:
36
            return self._query.find_id(inchikey)
37
38
    def find_inchikey(self, cid: int) -> Optional[str]:
39
        path = self.cid_path(cid)
40
        if path.exists():
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
41
            return self._read_inchikey_from_cid(cid)
42
        elif self._query is None:
43
            raise PubchemCompoundLookupError(f"No InChI Key link found at {path}")
44
        return self._query.find_inchikey(cid)
45
46
    def fetch_data(self, inchikey: Union[str, int]) -> Optional[PubchemData]:
47
        path = self.data_path(inchikey)
48
        path.parent.mkdir(parents=True, exist_ok=True)
49
        cid_path = self.cid_path(inchikey)
50
        if isinstance(inchikey, int) and cid_path.exists():
51
            cid = inchikey
52
            inchikey = self._read_inchikey_from_cid(inchikey)
53
        elif isinstance(inchikey, int) and self._query is not None:
54
            cid = inchikey
55
            inchikey = self._query.find_inchikey(inchikey)
56
            self._add_cid(cid, inchikey)
57
            cid_path.write_text(inchikey, encoding="utf8")
58
        elif isinstance(inchikey, int):
59
            raise PubchemCompoundLookupError(f"No InChI Key link found at {cid_path}")
60
        if path.exists():
61
            logger.debug(f"Found cached PubChem data at {path.absolute()}")
62
        elif self._query is None:
63
            raise PubchemCompoundLookupError(f"{inchikey} not found cached at {path}")
64
        else:
65
            try:
66
                data = self._query.fetch_data(inchikey)
67
            except PubchemCompoundLookupError:
68
                # write an empty dict so we don't query again
69
                self._write_json(NestedDotDict({}).to_json(), path)
70
                raise
71
            encoded = data.to_json()
72
            self._write_json(encoded, path)
73
            cid_path.write_text(inchikey, encoding="utf8")
74
            logger.debug(f"Wrote PubChem data to {path.absolute()}")
75
            return data
76
        read = self._read_json(path)
77
        if len(read) == 0:
78
            raise PubchemCompoundLookupError(f"{inchikey} is empty at {path}")
79
        return PubchemData(read)
80
81
    def list_data(self) -> Sequence[Path]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
82
        return {
83
            p.name.replace(".json.gz", ""): p for p in (self._cache_dir / "data").glob("*.json.gz")
84
        }
85
86
    def cid_path(self, cid: int) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
87
        return self._cache_dir / "cids" / f"{cid}.txt"
88
89
    def data_path(self, inchikey: str) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
90
        return self._cache_dir / "data" / f"{inchikey}.json.gz"
91
92
    def similarity_path(self, inchikey: str) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
93
        return self._cache_dir / "similarity" / f"{inchikey}.snappy"
94
95
    def _add_all_cids(self):
96
        # not normally needed, but we run this for mainly historical reasons
97
        logger.info(f"Adding missing CID links.")
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
98
        for inchikey, path in self.list_data():
0 ignored issues
show
Unused Code introduced by
The variable path seems to be unused.
Loading history...
99
            data = self.fetch_data(inchikey)
100
            cid = data.cid
101
            self._add_cid(cid, inchikey)
102
103
    def _add_cid(self, cid: int, inchikey: str):
104
        cid_path = self.cid_path(cid)
105
        cid_path.parent.mkdir(parents=True, exist_ok=True)
106
        if cid_path.exists():
107
            loaded_inchikey = self._read_inchikey_from_cid(cid)
108
            if loaded_inchikey != inchikey:
109
                logger.error(
110
                    f"For {cid}, existing entry points to {loaded_inchikey}, not {inchikey}. Overwriting."
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (106/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
111
                )
112
        cid_path.write_text(inchikey, encoding="utf8")
113
114
    def _read_inchikey_from_cid(self, cid: int):
115
        path = self.cid_path(cid)
116
        if not path.exists():
117
            raise PubchemCompoundLookupError(f"No InChI Key link found at {path}")
118
        z = path.read_text(encoding="utf8").strip()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "z" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
119
        if len(z) == 0:
0 ignored issues
show
Unused Code introduced by
Unnecessary "else" after "raise"
Loading history...
120
            path.unlink()
121
            raise PubchemCompoundLookupError(f"No InChI Key link found at {path}")
122
        else:
123
            return z
124
125
    def _write_json(self, encoded: str, path: Path) -> None:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
126
        path.write_bytes(gzip.compress(encoded.encode(encoding="utf8")))
127
128
    def _read_json(self, path: Path) -> NestedDotDict:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
129
        deflated = gzip.decompress(path.read_bytes())
130
        read = orjson.loads(deflated)
131
        return NestedDotDict(read)
132
133
    def find_similar_compounds(self, inchi: Union[int, str], min_tc: float) -> FrozenSet[int]:
134
        path = self.similarity_path(inchi)
135
        if not path.exists():
136
            df = None
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
137
            existing = set()
138
        else:
139
            df = pd.read_csv(path, sep="\t")
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
140
            df = df[df["min_tc"] < min_tc]
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
141
            existing = set(df["cid"].values)
142
        if len(existing) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
143
            found = self._query.find_similar_compounds(inchi, min_tc)
144
            path.parent.mkdir(parents=True, exist_ok=True)
145
            new_df = pd.DataFrame([pd.Series(dict(cid=cid, min_tc=min_tc)) for cid in found])
146
            if df is not None:
147
                new_df = pd.concat([df, new_df])
148
            new_df.to_csv(path, sep="\t")
149
            return frozenset(existing.union(found))
150
        else:
151
            return frozenset(existing)
152
153
154
__all__ = ["CachingPubchemApi"]
155