Passed
Push — dependabot/pip/flake8-bugbear-... ( 82a4d5...16d864 )
by
unknown
02:18
created

mandos.entries.searcher.SearcherUtils.read()   B

Complexity

Conditions 6

Size

Total Lines 16
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 15
nop 2
dl 0
loc 16
rs 8.6666
c 0
b 0
f 0
1
"""
2
Run searches and write files.
3
"""
4
5
from __future__ import annotations
6
7
import gzip
8
import logging
9
from pathlib import Path
10
from typing import Sequence, Optional, Dict
11
12
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
13
from pocketutils.core.dot_dict import NestedDotDict
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.dot_dict'
Loading history...
14
from pocketutils.tools.path_tools import PathTools
0 ignored issues
show
introduced by
Unable to import 'pocketutils.tools.path_tools'
Loading history...
15
from typeddfs import TypedDfs
0 ignored issues
show
introduced by
Unable to import 'typeddfs'
Loading history...
16
17
from mandos.model import CompoundNotFoundError
18
from mandos.model.chembl_support.chembl_utils import ChemblUtils
19
from mandos.model.searches import Search
20
from mandos.model.settings import MANDOS_SETTINGS
21
from mandos.search.chembl import ChemblSearch
22
from mandos.search.pubchem import PubchemSearch
23
from mandos.entries.api_singletons import Apis
24
25
Chembl, Pubchem = Apis.Chembl, Apis.Pubchem
26
logger = logging.getLogger(__package__)
27
28
IdMatchFrame = (
29
    TypedDfs.typed("IdMatchFrame")
30
    .require("inchikey")
31
    .require("chembl_id")
32
    .require("pubchem_id")
33
    .strict()
34
).build()
35
36
37
class SearcherUtils:
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
38
    @classmethod
39
    def dl(
0 ignored issues
show
Coding Style Naming introduced by
Method name "dl" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
introduced by
Missing function or method docstring
Loading history...
Unused Code introduced by
Either all return statements in a function should return an expression, or none of them should.
Loading history...
40
        cls,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
41
        inchikeys: Sequence[str],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
42
        pubchem: bool = True,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
43
        chembl: bool = True,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
44
        hmdb: bool = True,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
Unused Code introduced by
The argument hmdb seems to be unused.
Loading history...
45
    ) -> IdMatchFrame:
46
        # we actually cache the results, even though the underlying APIs cache
47
        # the reasons for this are a little obscure --
48
        # when running a Searcher, we want to run before the FIRST search
49
        # for the typer commands to be replicas of the ``Entry.run`` methods, Searcher fetches before running a search
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (118/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
50
        # but if we have multiple searches (as in ``mandos search --config``), we only want that at the beginning
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (113/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
51
        # the alternative was having ``mandos search`` dynamically subclass each ``Entry`` -- which was really hard
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (115/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
52
        # this is much cleaner, even though it's redundant
53
        # if the cached results under /pubchem and /chembl are deleted, we unfortunately won't cache the results
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (112/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
54
        # when running this command
55
        # to fix that, we need to delete the cached /match dataframes
56
        # now that I'm writing this down, I realize this is pretty bad
57
        # TODO
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
58
        key = hash(",".join(inchikeys))
59
        cached_path = (MANDOS_SETTINGS.match_cache_path / str(key)).with_suffix(".feather")
60
        if cached_path.exists():
61
            logger.info(f"Found ID matching results at {cached_path}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
62
            return IdMatchFrame.read_feather(cached_path)
63
        found_chembl: Dict[str, str] = {}
64
        found_pubchem: Dict[str, str] = {}
65
        if pubchem:
66
            for inchikey in inchikeys:
67
                try:
68
                    found_pubchem[inchikey] = str(Pubchem.fetch_data(inchikey).cid)
69
                except CompoundNotFoundError:
70
                    logger.error(f"Did not find compound {inchikey}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
71
                    logger.debug(f"Did not find compound {inchikey}", exc_info=True)
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
72
        if chembl:
73
            for inchikey in inchikeys:
74
                try:
75
                    found_chembl[inchikey] = ChemblUtils(Chembl).get_compound(inchikey).chid
76
                except CompoundNotFoundError:
77
                    logger.error(f"Did not find compound {inchikey}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
78
                    logger.debug(f"Did not find compound {inchikey}", exc_info=True)
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
79
        df = pd.DataFrame([pd.Series(dict(inchikey=c)) for c in inchikeys])
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
80
        df["chembl_id"] = df["inchikey"].map(found_chembl.get)
81
        df["pubchem_id"] = df["inchikey"].map(found_pubchem.get)
82
        df = IdMatchFrame(df)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
83
        df.to_feather(cached_path)
84
85
    @classmethod
86
    def read(cls, input_path: Path) -> Sequence[str]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
87
        sep = cls._get_sep(input_path)
88
        if sep in {"\t", ","}:
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
89
            df = pd.read_csv(input_path, sep=sep)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
90
            return cls._from_df(df)
91
        elif sep == "feather":
92
            df = pd.read_feather(input_path)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
93
            return cls._from_df(df)
94
        elif sep == "gz":
95
            with gzip.open(input_path, "rt") as f:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
96
                return cls._from_txt(f.read())
97
        elif sep == "txt":
98
            return cls._from_txt(input_path.read_text(encoding="utf8"))
99
        else:
100
            raise AssertionError(sep)
101
102
    @classmethod
103
    def _from_df(cls, df: pd.DataFrame) -> Sequence[str]:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
104
        df.columns = [c.lower() if isinstance(c, str) else c for c in df.columns]
105
        if "inchikey" not in df.columns:
106
            raise KeyError("For a CSV or TSV file, include a column called 'inchikey'")
107
        return df["inchikey"].values.tolist()
108
109
    @classmethod
110
    def _from_txt(cls, text: str) -> Sequence[str]:
111
        return [line.strip() for line in text.splitlines() if len(line.strip()) > 0]
112
113
    @classmethod
114
    def _get_sep(cls, input_path: Path) -> str:
115
        if any((str(input_path).endswith(z) for z in {".tab", ".tsv", ".tab.gz", ".tsv.gz"})):
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
Comprehensibility Best Practice introduced by
The variable z does not seem to be defined.
Loading history...
116
            return "\t"
117
        elif any((str(input_path).endswith(z) for z in {".csv", ".csv.gz"})):
118
            return ","
119
        elif any((str(input_path).endswith(z) for z in {".feather"})):
120
            return "feather"
121
        elif any((str(input_path).endswith(z) for z in {".txt.gz", ".lines.gz"})):
122
            return "gz"
123
        elif any((str(input_path).endswith(z) for z in {".txt", ".lines"})):
124
            return "txt"
125
        else:
126
            raise ValueError(f"{input_path} should end in .tab, .tsv, .csv, .txt, .lines, or .gz")
127
128
129
class Searcher:
130
    """
131
    Executes one or more searches and saves the results to CSV files.
132
    Create and use once.
133
    """
134
135
    def __init__(self, searches: Sequence[Search], input_path: Path):
136
        """
137
        Constructor.
138
139
        Args:
140
            searches:
141
            input_path: Path to the input file of one of the formats:
142
                - .txt containing one InChI Key per line
143
                - .csv, .tsv, .tab, csv.gz, .tsv.gz, .tab.gz, or .feather containing a column called inchikey
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
144
        """
145
        self.what = searches
146
        self.input_path: Optional[Path] = input_path
147
        self.inchikeys: Optional[Sequence[str]] = []
148
149
    def search(self) -> Searcher:
150
        """
151
        Performs the search, and writes data.
152
        """
153
        if self.inchikeys is not None:
154
            raise ValueError(f"Already ran a search")
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
155
        self.inchikeys = SearcherUtils.read(self.input_path)
156
        has_pubchem = any((isinstance(what, PubchemSearch) for what in self.what))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable what does not seem to be defined.
Loading history...
157
        has_chembl = any((isinstance(what, ChemblSearch) for what in self.what))
158
        # find the compounds first so the user knows what's missing before proceeding
159
        SearcherUtils.dl(self.inchikeys, pubchem=has_pubchem, chembl=has_chembl)
160
        for what in self.what:
161
            output_path = self.output_path_of(what)
162
            df = what.find_to_df(self.inchikeys)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
163
            df.to_csv(output_path)
164
            metadata = NestedDotDict(
165
                dict(key=what.key, search=what.search_class, params=what.get_params())
166
            )
167
            metadata.write_json(output_path.with_suffix(".json"))
168
        return self
169
170
    def paths(self) -> Sequence[Path]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
171
        return [self.output_path_of(what) for what in self.what]
172
173
    def output_path_of(self, what: Search) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
174
        parent = self.input_path.parent
175
        child = self.input_path.stem + what.key + ".tab"
176
        node = PathTools.sanitize_path_node(child)
177
        if (parent / node).resolve() != (parent / child).resolve():
178
            logger.debug(f"Path {child} sanitized to {node}")
0 ignored issues
show
introduced by
Use lazy % formatting in logging functions
Loading history...
179
        return parent / node
180
181
182
__all__ = ["Searcher", "IdMatchFrame", "SearcherUtils"]
183