Passed
Push — main ( da77b5...65730f )
by Douglas
02:28
created

CompoundIdFiller._process()   C

Complexity

Conditions 8

Size

Total Lines 60
Code Lines 46

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 46
nop 7
dl 0
loc 60
rs 6.9006
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from __future__ import annotations
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
3
from dataclasses import dataclass
4
from typing import Any, Mapping, MutableMapping, Optional, Tuple
5
6
from pocketutils.core.exceptions import XValueError
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.exceptions'
Loading history...
7
from pocketutils.tools.common_tools import CommonTools
0 ignored issues
show
introduced by
Unable to import 'pocketutils.tools.common_tools'
Loading history...
8
from typeddfs import TypedDfs
0 ignored issues
show
introduced by
Unable to import 'typeddfs'
Loading history...
9
10
from mandos import logger
11
from mandos.entry.api_singletons import Apis
12
from mandos.model import CompoundStruct
13
from mandos.model.apis.chembl_support.chembl_utils import ChemblUtils
14
from mandos.model.apis.pubchem_support.pubchem_data import PubchemData
15
from mandos.model.utils import CompoundNotFoundError
16
17
IdMatchDf = (
18
    TypedDfs.typed("IdMatchDf")
19
    .reserve("inchikey", dtype=str)
20
    .reserve("compound_id", "compound_name", "library", dtype=str)
21
    .reserve("inchi", dtype=str)
22
    .reserve("chembl_id", "pubchem_id", "hmdb_id", dtype=str)
23
    .reserve("chembl_inchikey", "pubchem_inchikey", dtype=str)
24
    .reserve("chembl_inchi", "pubchem_inchi", dtype=str)
25
    .reserve("origin_inchi", "origin_inchikey", dtype=str)
26
    .strict(cols=False)
27
    .secure()
28
    .hash(file=True)
29
).build()
30
31
32
FILL_IDS = [
33
    "inchi",
34
    "inchikey",
35
    "chembl_id",
36
    "pubchem_id",
37
    "chembl_inchi",
38
    "chembl_inchikey",
39
    "pubchem_inchi",
40
    "pubchem_inchikey",
41
]
42
PUT_FIRST = [
43
    "compound_id",
44
    "compound_name",
45
    "library",
46
    "inchikey",
47
    "chembl_id",
48
    "pubchem_id",
49
    "g2p_id",
50
    "chembl_inchikey",
51
    "pubchem_inchikey",
52
    "origin_inchikey",
53
]
54
PUT_LAST = ["inchi", "chembl_inchi", "pubchem_inchi", "origin_inchi", "smiles"]
55
56
Db = str
57
58
59
def look(obj, attrs):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
60
    s = CommonTools.look(obj, attrs)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
61
    if isinstance(s, str) and s.upper() == "N/A":
62
        return None
63
    return None if CommonTools.is_probable_null(s) else s
64
65
66
@dataclass(frozen=True, repr=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
67
class CompoundIdFiller:
68
    chembl: bool = True
69
    pubchem: bool = True
70
71
    def fill(self, df: IdMatchDf) -> IdMatchDf:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
introduced by
Missing function or method docstring
Loading history...
72
        df = self._prep(df)
73
        logger.info(f"Processing {len(df)} input compounds")
74
        fill = []
75
        for i, row in enumerate(df.itertuples()):
76
            if i % 200 == 0 and i > 0:
77
                logger.notice(f"Processed {i:,} / {len(df):,}")
78
            elif i % 20 == 0 and i > 0:
79
                logger.info(f"Processed {i:,} / {len(df):,}")
80
            with logger.contextualize(line=i):
81
                proc = self._process(
82
                    compound_id=look(row, "compound_id"),
83
                    library=look(row, "library"),
84
                    inchi=look(row, "origin_inchi"),
85
                    inchikey=look(row, "origin_inchikey"),
86
                    pubchem_id=look(row, "origin_pubchem_id"),
87
                    chembl_id=look(row, "origin_chembl_id"),
88
                )
89
            fill.append(proc)
90
        for c in FILL_IDS:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "c" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
91
            df[c] = [r[c] for r in fill]
92
        duplicate_cols = []
93
        for c in FILL_IDS:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "c" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
94
            if c in df.columns and "origin_" + c in df.columns:
95
                if df[c].values.tolist() == df["origin_" + c].values.tolist():
96
                    duplicate_cols.append("origin_" + c)
97
        logger.notice(f"Done — filled {len(df):,} rows")
98
        if len(duplicate_cols) > 0:
99
            df = df.drop_cols(duplicate_cols)
100
            logger.notice(f"Dropped duplicated columns {', '.join(duplicate_cols)}")
101
        order = [o for o in PUT_FIRST if o in df.columns]
102
        order += [c for c in df.columns if c not in PUT_FIRST and c not in PUT_LAST]
103
        order += [o for o in PUT_LAST if o in df.columns]
104
        df = df.cfirst(order)
105
        have_chembl = len(df) - len(df[df["chembl_id"].isnull()]["chembl_id"].tolist())
106
        have_pubchem = len(df) - len(df[df["pubchem_id"].isnull()]["pubchem_id"].tolist())
107
        logger.notice(f"{have_chembl:,}/{len(df):,} have ChEMBL IDs")
108
        logger.notice(f"{have_pubchem:,}/{len(df):,} have PubChem IDs")
109
        return df
110
111
    def _process(
0 ignored issues
show
best-practice introduced by
Too many arguments (7/5)
Loading history...
112
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
113
        compound_id: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
114
        library: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
Unused Code introduced by
The argument library seems to be unused.
Loading history...
115
        inchi: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
116
        inchikey: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
117
        pubchem_id: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
118
        chembl_id: Optional[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
119
    ) -> Mapping[str, Any]:
120
        if inchikey is pubchem_id is chembl_id is None:
121
            logger.error(f"No data for {compound_id}")
122
            return dict(
123
                inchi=inchi,
124
                inchikey=inchikey,
125
                chembl_id=None,
126
                chembl_inchi=None,
127
                chembl_inchikey=None,
128
                pubchem_id=None,
129
                pubchem_inchi=None,
130
                pubchem_inchikey=None,
131
            )
132
        fake_x = CompoundStruct("input", compound_id, inchi, inchikey)
133
        chembl_x = self._get_chembl(inchikey, chembl_id)
134
        pubchem_x = self._get_pubchem(inchikey, pubchem_id)
135
        #################################################################################
136
        # This is important and weird!
137
        # Where DNE = does not exist and E = exists
138
        # If chembl DNE and pubchem E ==> fill chembl
139
        # THEN: If chembl E and (pubchem E or pubchem DNE) ==> fill pubchem
140
        # we might therefore go from pubchem --> chembl --> pubchem
141
        # The advantage is that chembl might have a good parent compound
142
        # Whereas pubchem does not
143
        # This is often true: chembl is much better at this than pubchem
144
        # In contrast, only fill ChEMBL if it's missing
145
        if chembl_x is None and pubchem_x is not None:
146
            chembl_x = self._get_chembl(pubchem_x.inchikey, None)
147
        if chembl_x is not None:
148
            pubchem_x = self._get_pubchem(chembl_x.inchikey, None)
149
        #################################################################################
150
        # the order is from best to worst
151
        prioritize_choices = [chembl_x, pubchem_x, fake_x]
152
        db_to_struct = {o.db: o for o in prioritize_choices if o is not None}
153
        inchikey, inchikey_choices = self._choose(db_to_struct, "inchikey")
154
        inchi, inchi_choices = self._choose(db_to_struct, "inchi")
155
        about = " ; ".join([x.simple_str for x in prioritize_choices if x is not None])
156
        if len(inchikey_choices) == 0:
157
            logger.error(f"no database inchikeys found :: {about}")
158
        elif len(inchikey_choices) > 1:
159
            logger.error(f"inchikey mismatch :: {about} :: {inchikey_choices}")
160
        elif len(inchi_choices) > 1:
161
            logger.debug(f"inchi mismatch :: {about} :: {inchi_choices}")
162
        return dict(
163
            inchi=inchi,
164
            inchikey=inchikey,
165
            chembl_id=look(chembl_x, "id"),
166
            chembl_inchi=look(chembl_x, "inchi"),
167
            chembl_inchikey=look(chembl_x, "inchikey"),
168
            pubchem_id=look(pubchem_x, "id"),
169
            pubchem_inchi=look(pubchem_x, "inchi"),
170
            pubchem_inchikey=look(pubchem_x, "inchikey"),
171
        )
172
173
    def _choose(
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
174
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
175
        db_to_struct: Mapping[str, CompoundStruct],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
176
        what: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
177
    ) -> Tuple[Optional[str], MutableMapping[str, Db]]:
178
        """
179
        Chooses the best what="inchi" or what="inchikey".
180
181
        Arguments:
182
            db_to_struct: Should be in order from most preferred to least
183
            what: The name of the CompoundStruct attribute to access
184
        """
185
        options = {o.db: look(o, what) for o in db_to_struct.values() if look(o, what) is not None}
186
        _s = ", ".join([f"{k}={v}" for k, v in options.items()])
187
        non_input_dbs = {v: k for k, v in options.items() if k != "input"}
188
        all_uniques = set(options.values())
189
        if len(all_uniques) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
190
            return None, {}
191
        else:
192
            return list(all_uniques)[0], non_input_dbs
193
194
    def _prep(self, df: IdMatchDf) -> IdMatchDf:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
195
        bad_cols = [c for c in df.columns if c.startswith("origin_")]
196
        if len(bad_cols) > 0:
197
            raise XValueError(f"Columns {', '.join(bad_cols)} start with 'origin_'")
198
        rename_cols = {c: "origin_" + c for c in FILL_IDS if c in df.columns}
199
        if len(rename_cols) > 0:
200
            logger.notice(f"Renaming columns: {', '.join(rename_cols.keys())}")
201
        df: IdMatchDf = df.rename(columns=rename_cols)
202
        drop_cols = {c for c in df.columns if df[c].isnull().all()}
203
        if len(drop_cols):
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) without comparison to determine if a sequence is empty
Loading history...
204
            logger.warning(f"Dropping empty columns: {', '.join(drop_cols)}")
205
        df = df.drop_cols(drop_cols)
206
        return df
207
208
    def _get_pubchem(self, inchikey: Optional[str], cid: Optional[int]) -> Optional[CompoundStruct]:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
Unused Code introduced by
Either all return statements in a function should return an expression, or none of them should.
Loading history...
209
        api = Apis.Pubchem
210
        if cid is not None:
211
            # let it raise a CompoundNotFoundError
212
            inchikey = api.fetch_data(int(cid)).names_and_identifiers.inchikey
213
            if inchikey is None:
214
                return None
215
        if inchikey is not None:
216
            try:
217
                data: Optional[PubchemData] = api.fetch_data(inchikey)
218
            except CompoundNotFoundError:
219
                return None
220
            return None if data is None else data.struct_view
221
222
    def _get_chembl(self, inchikey: Optional[str], cid: Optional[str]) -> Optional[CompoundStruct]:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
223
        util = ChemblUtils(Apis.Chembl)
224
        if cid is not None:
225
            # let it raise a CompoundNotFoundError
226
            return util.get_compound(cid).struct_view
227
        try:
228
            return util.get_compound(inchikey).struct_view
229
        except CompoundNotFoundError:
230
            return None
231
232
233
__all__ = ["CompoundIdFiller", "IdMatchDf"]
234