Passed
Push — main ( cee75c...37036d )
by Douglas
02:08
created

mandos.entry.fillers.CompoundIdFiller.fill()   D

Complexity

Conditions 12

Size

Total Lines 39
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 38
nop 2
dl 0
loc 39
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like mandos.entry.fillers.CompoundIdFiller.fill() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import annotations
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
from dataclasses import dataclass
3
from typing import Optional, Mapping, Tuple, Dict, MutableMapping
0 ignored issues
show
Unused Code introduced by
Unused Dict imported from typing
Loading history...
4
5
from mandos import logger
6
from pocketutils.tools.common_tools import CommonTools
0 ignored issues
show
introduced by
Unable to import 'pocketutils.tools.common_tools'
Loading history...
7
from typeddfs import TypedDfs
0 ignored issues
show
introduced by
Unable to import 'typeddfs'
Loading history...
8
9
from mandos.model import CompoundNotFoundError, CompoundStruct
0 ignored issues
show
introduced by
Imports from package mandos are not grouped
Loading history...
10
11
from mandos.entry.api_singletons import Apis
12
from mandos.model.apis.chembl_support.chembl_utils import ChemblUtils
13
from mandos.model.apis.pubchem_support.pubchem_data import PubchemData
14
15
16
IdMatchFrame = (
17
    TypedDfs.typed("IdMatchFrame")
18
    .reserve("inchikey", dtype=str)
19
    .reserve("compound_id", "compound_name", "library", dtype=str)
20
    .reserve("inchi", dtype=str)
21
    .reserve("chembl_id", "pubchem_id", "hmdb_id", dtype=str)
22
    .reserve("chembl_inchikey", "pubchem_inchikey", dtype=str)
23
    .reserve("chembl_inchi", "pubchem_inchi", dtype=str)
24
    .reserve("origin_inchi", "origin_inchikey", dtype=str)
25
    .strict(cols=False)
26
    .secure()
27
).build()
28
29
30
FILL_IDS = [
31
    "inchi",
32
    "inchikey",
33
    "chembl_id",
34
    "pubchem_id",
35
    "chembl_inchi",
36
    "chembl_inchikey",
37
    "pubchem_inchi",
38
    "pubchem_inchikey",
39
]
40
PUT_FIRST = [
41
    "compound_id",
42
    "compound_name",
43
    "library",
44
    "inchikey",
45
    "chembl_id",
46
    "pubchem_id",
47
    "g2p_id",
48
    "chembl_inchikey",
49
    "pubchem_inchikey",
50
    "origin_inchikey",
51
]
52
PUT_LAST = ["inchi", "chembl_inchi", "pubchem_inchi", "origin_inchi", "smiles"]
53
54
Db = str
55
56
57
def look(obj, attrs):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
58
    s = CommonTools.look(obj, attrs)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
59
    if isinstance(s, str) and s.upper() == "N/A":
60
        return None
61
    return None if CommonTools.is_probable_null(s) else s
62
63
64
@dataclass(frozen=True, repr=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
65
class CompoundIdFiller:
66
    chembl: bool = True
67
    pubchem: bool = True
68
69
    def fill(self, df: IdMatchFrame) -> IdMatchFrame:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
70
        df = self._prep(df)
71
        logger.info(f"Processing {len(df)} input compounds...")
72
        fill = []
73
        for i, row in enumerate(df.itertuples()):
74
            if i % 200 == 0 and i > 0:
75
                logger.notice(f"Processed {i:,} / {len(df):,}")
76
            elif i % 20 == 0 and i > 0:
77
                logger.info(f"Processed {i:,} / {len(df):,}")
78
            proc = self._process(
79
                compound_id=look(row, "compound_id"),
80
                library=look(row, "library"),
81
                inchi=look(row, "origin_inchi"),
82
                inchikey=look(row, "origin_inchikey"),
83
                pubchem_id=look(row, "origin_pubchem_id"),
84
                chembl_id=look(row, "origin_chembl_id"),
85
                line_no=i,
86
            )
87
            fill.append(proc)
88
        for c in FILL_IDS:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "c" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
89
            df[c] = [r[c] for r in fill]
90
        duplicate_cols = []
91
        for c in FILL_IDS:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "c" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
92
            if c in df.columns and "origin_" + c in df.columns:
93
                if df[c].values.tolist() == df["origin_" + c].values.tolist():
94
                    duplicate_cols.append("origin_" + c)
95
        logger.notice(f"Done. Filled {len(df):,} rows.")
96
        if len(duplicate_cols) > 0:
97
            df = df.drop_cols(duplicate_cols)
98
            logger.notice(f"Dropped duplicated columns {', '.join(duplicate_cols)}")
99
        order = [o for o in PUT_FIRST if o in df.columns]
100
        order += [c for c in df.columns if c not in PUT_FIRST and c not in PUT_LAST]
101
        order += [o for o in PUT_LAST if o in df.columns]
102
        df = df.cfirst(order)
103
        have_chembl = len(df) - len(df[df["chembl_id"].isnull()]["chembl_id"].tolist())
104
        have_pubchem = len(df) - len(df[df["pubchem_id"].isnull()]["pubchem_id"].tolist())
105
        logger.notice(f"{have_chembl:,}/{len(df):,} have ChEMBL IDs")
106
        logger.notice(f"{have_pubchem:,}/{len(df):,} have PubChem IDs")
107
        return df
108
109
    def _process(
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (16/15).
Loading history...
best-practice introduced by
Too many arguments (8/5)
Loading history...
110
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
111
        compound_id: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
112
        library: Optional[str],
0 ignored issues
show
Unused Code introduced by
The argument library seems to be unused.
Loading history...
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
113
        inchi: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
114
        inchikey: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
115
        pubchem_id: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
116
        chembl_id: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
117
        line_no: int,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
118
    ):
119
        if inchikey is pubchem_id is chembl_id is None:
120
            logger.error(f"[line {line_no}] No data for {compound_id}")
121
            return dict(
122
                inchi=inchi,
123
                inchikey=inchikey,
124
                chembl_id=None,
125
                chembl_inchi=None,
126
                chembl_inchikey=None,
127
                pubchem_id=None,
128
                pubchem_inchi=None,
129
                pubchem_inchikey=None,
130
            )
131
        fake_x = CompoundStruct("input", compound_id, inchi, inchikey)
132
        chembl_x = self._get_chembl(inchikey, chembl_id)
133
        pubchem_x = self._get_pubchem(inchikey, pubchem_id)
134
        #################################################################################
135
        # This is important and weird!
136
        # Where DNE = does not exist and E = exists
137
        # If chembl DNE and pubchem E ==> fill chembl
138
        # THEN: If chembl E and (pubchem E or pubchem DNE) ==> fill pubchem
139
        # we might therefore go from pubchem --> chembl --> pubchem
140
        # The advantage is that chembl might have a good parent compound
141
        # Whereas pubchem does not
142
        # This is often true: chembl is much better at this than pubchem
143
        # In contrast, only fill ChEMBL if it's missing
144
        if chembl_x is None and pubchem_x is not None:
145
            chembl_x = self._get_chembl(pubchem_x.inchikey, None)
146
        if chembl_x is not None:
147
            pubchem_x = self._get_pubchem(chembl_x.inchikey, None)
148
        #################################################################################
149
        # the order is from best to worst
150
        prioritize_choices = [chembl_x, pubchem_x, fake_x]
151
        db_to_struct = {o.db: o for o in prioritize_choices if o is not None}
152
        inchikey, inchikey_choices = self._choose(db_to_struct, "inchikey")
153
        inchi, inchi_choices = self._choose(db_to_struct, "inchi")
154
        about = " ; ".join([x.simple_str for x in prioritize_choices if x is not None])
155
        if len(inchikey_choices) == 0:
156
            logger.error(f"[line {line_no}] no database inchikeys found :: {about}")
157
        elif len(inchikey_choices) > 1:
158
            logger.error(f"[line {line_no}] inchikey mismatch :: {about} :: {inchikey_choices}")
159
        elif len(inchi_choices) > 1:
160
            logger.debug(f"[line {line_no}] inchi mismatch :: {about} :: {inchi_choices}")
161
        return dict(
162
            inchi=inchi,
163
            inchikey=inchikey,
164
            chembl_id=look(chembl_x, "id"),
165
            chembl_inchi=look(chembl_x, "inchi"),
166
            chembl_inchikey=look(chembl_x, "inchikey"),
167
            pubchem_id=look(pubchem_x, "id"),
168
            pubchem_inchi=look(pubchem_x, "inchi"),
169
            pubchem_inchikey=look(pubchem_x, "inchikey"),
170
        )
171
172
    def _choose(
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
173
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
174
        db_to_struct: Mapping[str, CompoundStruct],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
175
        what: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
176
    ) -> Tuple[Optional[str], MutableMapping[str, Db]]:
177
        """
178
        Chooses the best what="inchi" or what="inchikey".
179
180
        Arguments:
181
            db_to_struct: Should be in order from most preferred to least
182
            what: The name of the CompoundStruct attribute to access
183
        """
184
        options = {o.db: look(o, what) for o in db_to_struct.values() if look(o, what) is not None}
185
        _s = ", ".join([f"{k}={v}" for k, v in options.items()])
186
        non_input_dbs = {v: k for k, v in options.items() if k != "input"}
187
        all_uniques = set(options.values())
188
        if len(all_uniques) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
189
            return None, {}
190
        else:
191
            return list(all_uniques)[0], non_input_dbs
192
193
    def _prep(self, df: IdMatchFrame) -> IdMatchFrame:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
194
        bad_cols = [c for c in df.columns if c.startswith("origin_")]
195
        if len(bad_cols) > 0:
196
            raise ValueError(f"Columns {', '.join(bad_cols)} start with 'origin_'")
197
        rename_cols = {c: "origin_" + c for c in FILL_IDS if c in df.columns}
198
        if len(rename_cols) > 0:
199
            logger.notice(f"Renaming columns: {', '.join(rename_cols.keys())}")
200
        df: IdMatchFrame = df.rename(columns=rename_cols)
201
        drop_cols = {c for c in df.columns if df[c].isnull().all()}
202
        if len(drop_cols):
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) without comparison to determine if a sequence is empty
Loading history...
203
            logger.warning(f"Dropping empty columns: {', '.join(drop_cols)}")
204
        df = df.drop_cols(drop_cols)
205
        return df
206
207
    def _get_pubchem(self, inchikey: Optional[str], cid: Optional[int]) -> Optional[CompoundStruct]:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
Unused Code introduced by
Either all return statements in a function should return an expression, or none of them should.
Loading history...
208
        api = Apis.Pubchem
209
        if cid is not None:
210
            # let it raise a CompoundNotFoundError
211
            inchikey = api.fetch_data(int(cid)).names_and_identifiers.inchikey
212
            if inchikey is None:
213
                return None
214
        if inchikey is not None:
215
            try:
216
                data: Optional[PubchemData] = api.fetch_data(inchikey)
217
            except CompoundNotFoundError:
218
                return None
219
            return None if data is None else data.struct_view
220
221
    def _get_chembl(self, inchikey: Optional[str], cid: Optional[str]) -> Optional[CompoundStruct]:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
222
        util = ChemblUtils(Apis.Chembl)
223
        if cid is not None:
224
            # let it raise a CompoundNotFoundError
225
            return util.get_compound(cid).struct_view
226
        try:
227
            return util.get_compound(inchikey).struct_view
228
        except CompoundNotFoundError:
229
            return None
230
231
232
__all__ = ["CompoundIdFiller", "IdMatchFrame"]
233