Passed
Push — main ( 83a9fb...fa90c4 )
by Douglas
03:43
created

UniprotTaxonomyCache._fix()   A

Complexity

Conditions 1

Size

Total Lines 23
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 14
nop 4
dl 0
loc 23
rs 9.7
c 0
b 0
f 0
1
"""
2
Caching.
3
"""
4
5
from __future__ import annotations
6
7
import abc
8
import shutil
9
from pathlib import Path
10
from typing import AbstractSet, Collection, Iterable, Mapping, Optional, Set, Union
0 ignored issues
show
Unused Code introduced by
Unused AbstractSet imported from typing
Loading history...
11
12
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
13
import requests
0 ignored issues
show
introduced by
Unable to import 'requests'
Loading history...
14
from pocketutils.core.exceptions import LookupFailedError, XValueError
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.exceptions'
Loading history...
Unused Code introduced by
Unused LookupFailedError imported from pocketutils.core.exceptions
Loading history...
15
from typeddfs import TypedDfs
0 ignored issues
show
introduced by
Unable to import 'typeddfs'
Loading history...
16
from typeddfs.checksums import Checksums
0 ignored issues
show
introduced by
Unable to import 'typeddfs.checksums'
Loading history...
17
18
from mandos.model.settings import SETTINGS, Globals
19
from mandos.model.taxonomy import Taxonomy, TaxonomyDf
20
from mandos.model.utils.resources import MandosResources
21
from mandos.model.utils.setup import logger
22
23
24
class TaxonomyFactory(metaclass=abc.ABCMeta):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
25
    def load(self, taxon: Union[int, str]) -> Taxonomy:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
26
        raise NotImplementedError()
27
28
29
class CachedTaxonomyCache(TaxonomyFactory, metaclass=abc.ABCMeta):
30
    """
31
    Preps a new taxonomy file for use in mandos.
32
    Just returns if a corresponding file already exists in the resources dir or mandos cache (``~/.mandos``).
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
33
    Otherwise, downloads a tab-separated file from UniProt.
34
    (To find manually, follow the ``All lower taxonomy nodes`` link and click ``Download``.)
35
    Then applies fixes and reduces the file size, creating a new file alongside.
36
    Puts both the raw data and fixed data in the cache under ``~/.mandos/taxonomy/``.
37
    """
38
39
    def __init__(self, *, cache_dir: Path = SETTINGS.taxonomy_cache_path, local_only: bool):
40
        self.cache_dir = cache_dir
41
        self.local_only = local_only
42
43
    def load(self, taxon: Union[int, str]) -> Taxonomy:
44
        """
45
        Tries, in order:
46
47
            1. A cached file exactly matching the taxon ID
48
            2. A taxon ID under vertebrata
49
            3. The UNIQUE name of a taxon under vertebrata
50
            4. Downloads the taxonomy with the specified ID
51
        """
52
        tree = self.load_exact(taxon)
53
        if tree is None:
54
            vert = self.load_vertebrate(Globals.vertebrata)
55
            if taxon in vert:
56
                tree = vert.subtrees_by_ids_or_names(taxon)
57
            else:
58
                logger.info(f"Taxon {taxon} found in the vertebrata cache")
59
                tree = self._load_or_dl(taxon)
60
        logger.info(f"Taxonomy has {len(tree)} taxa with {len(tree.roots)} roots")
61
        return tree
62
63
    def load_exact(self, taxon: int) -> Optional[Taxonomy]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
64
        path = self._resolve_non_vertebrate_final(taxon)
65
        if (self._check_has(taxon, path) or self.local_only) and path.exists():
66
            return Taxonomy.from_path(path)
67
        return None
68
69
    def load_vertebrate(self, taxon: Union[int, str]) -> Optional[Taxonomy]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
70
        vertebrata = self._load_or_dl(Globals.vertebrata)
71
        vertebrate = vertebrata.subtrees_by_ids_or_names([taxon])
72
        return vertebrate if vertebrate.n_taxa() > 0 else None
73
74
    def _check_has(self, taxon: Union[str, int], path: Path) -> bool:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
75
        if path.exists():
76
            return not MandosResources.check_expired(
77
                path,
78
                max_sec=SETTINGS.taxon_expire_sec,
79
                what=f"Cached taxa under {taxon}",
80
            )
81
        return False
82
83
    def _load_or_dl(self, taxon: Union[int, str]) -> Taxonomy:
84
        path = self._resolve_non_vertebrate_final(taxon)
85
        raw_path = self._resolve_non_vertebrate_raw(taxon)
86
        if self._check_has(taxon, path) or self.local_only:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
87
            return Taxonomy.from_path(path)
88
        else:
89
            logger.notice(f"Downloading new taxonomy file for taxon {taxon}")
90
            self._download_raw(raw_path, taxon)
91
            path = self._resolve_non_vertebrate_final(taxon)
92
            df = self._fix(raw_path, taxon, path)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
93
            logger.notice(f"Cached taxonomy at {path} .")
94
            return df
95
96
    def rebuild(self, *taxa: int, replace: bool) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
97
        if self.local_only:
98
            logger.error(f"Cannot rebuild -- local_only is set")
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
99
        for taxon in taxa:
100
            path = self.resolve_path(taxon)
101
            if replace or not path.exists():
102
                self.delete_exact(taxon)
103
                self._load_or_dl(taxon)
104
                logger.notice(f"Regenerated {taxon} taxonomy")
105
106
    def delete_exact(self, taxon: int) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
107
        raw = self._resolve_non_vertebrate_raw(taxon)
108
        raw.unlink(missing_ok=True)
0 ignored issues
show
Bug introduced by
The keyword missing_ok does not seem to exist for the method call.
Loading history...
109
        p = self._resolve_non_vertebrate_raw(taxon)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "p" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
110
        if p.exists():
111
            p.unlink()
112
            logger.warning(f"Deleted cached taxonomy file {p}")
113
        # delete either way:
114
        checksum_file = Checksums.get_hash_file(p, algorithm=SETTINGS.checksum_algorithm)
0 ignored issues
show
Bug introduced by
The Instance of Settings does not seem to have a member named checksum_algorithm.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
115
        checksum_file.unlink(missing_ok=True)
116
117
    def resolve_path(self, taxon: int) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
118
        return self._resolve_non_vertebrate_final(taxon)
119
120
    def _resolve_non_vertebrate_final(self, taxon: int) -> Path:
121
        return self._get_resource(f"{taxon}{SETTINGS.archive_filename_suffix}")
122
123
    def _resolve_non_vertebrate_raw(self, taxon: int) -> Path:
124
        # this is what is downloaded from PubChem
125
        # the filename is the same
126
        return self._get_resource(f"taxonomy-ancestor_{taxon}.tsv.gz")
127
128
    def _get_resource(self, *nodes: Union[Path, str]) -> Path:
129
        path = MandosResources.path(*nodes)
130
        if path.exists():
131
            return path
132
        self.cache_dir.mkdir(parents=True, exist_ok=True)
133
        return Path(self.cache_dir, *nodes)
134
135
    def _download_raw(self, raw_path: Path, taxon: int) -> None:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
136
        # this is faster and safer than using pd.read_csv(url)
137
        # https://uniprot.org/taxonomy/?query=ancestor:7742&format=tab&force=true&columns=id&compress=yes
138
        url = f"https://uniprot.org/taxonomy/?query=ancestor:{taxon}&format=tab&force=true&columns=id&compress=yes"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (115/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
139
        with requests.get(url, stream=True) as r:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "r" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
140
            with raw_path.open("wb") as f:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
141
                shutil.copyfileobj(r.raw, f)
142
143
    def _fix(self, raw_path: Path, taxon: int, final_path: Path) -> TaxonomyDf:
144
        # now process it!
145
        # unfortunately it won't include an entry for the root ancestor (`taxon`)
146
        # so, we'll add it in (in ``df.append`` below)
147
        # noinspection PyPep8Naming
148
        raw_type = TypedDfs.untyped("Raw")
149
        df = raw_type.read_file(raw_path)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
150
        # find the scientific name of the parent
151
        scientific_name = self._determine_name(df, taxon)
152
        # now fix the columns
153
        df = df[["Taxon", "Mnemonic", "Scientific name", "Common name", "Parent"]]
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
154
        df.columns = ["taxon", "mnemonic", "scientific_name", "common_name", "parent"]
155
        # now add the ancestor back in
156
        df = df.append(
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
157
            pd.Series(dict(taxon=taxon, scientific_name=scientific_name, parent=0)),
158
            ignore_index=True,
159
        )
160
        df["parent"] = df["parent"].fillna(0).astype(int)
161
        # write it to a feather / csv / whatever
162
        df = TaxonomyDf.convert(df)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
163
        df.write_file(final_path, dir_hash=True)
164
        raw_path.unlink()
165
        return df
166
167
    def _determine_name(self, df: pd.DataFrame, taxon: int) -> str:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
168
        got = df[df["Parent"] == taxon]
169
        if len(got) == 0:
170
            raise XValueError(f"Could not infer scientific name for {taxon}")
171
        z = str(list(got["Lineage"])[0])
0 ignored issues
show
Coding Style Naming introduced by
Variable name "z" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
172
        return z.split("; ")[-1].strip()
173
174
175
class FixedTaxonomyFactory(TaxonomyFactory):
176
    """
177
    Mostly for testing.
178
    """
179
180
    def __init__(self, tax: Taxonomy):
181
        self._tax = tax
182
183
    def load(self, taxon: Union[int, str]) -> Taxonomy:
184
        if isinstance(taxon, str):
185
            taxon = self._tax.req_only_by_name(taxon).id
186
        return self._tax.subtree(taxon)
187
188
189
class TaxonomyFactories:
190
    """
191
    Collection of static factory methods.
192
    """
193
194
    @classmethod
195
    def list_cached_files(cls) -> Mapping[int, Path]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
196
        suffix = SETTINGS.archive_filename_suffix
197
        return {
198
            int(p.scientific_name.replace(suffix, "")): p
199
            for p in SETTINGS.taxonomy_cache_path.iterdir()
200
            if p.suffix.endswith(suffix)
201
        }
202
203
    @classmethod
204
    def main(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
205
        cls,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
206
        cache_dir: Path = SETTINGS.taxonomy_cache_path,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
207
        local_only: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
208
    ):
209
        return CachedTaxonomyCache(local_only=local_only, cache_dir=cache_dir)
210
211
    @classmethod
212
    def get_smart_taxonomy(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
213
        cls,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
214
        *,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
215
        allow: Iterable[Union[int, str]],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
216
        forbid: Iterable[Union[int, str]],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
217
        ancestors: Union[int, Collection[int]] = Globals.cellular_taxon,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
218
        cache_dir: Path = SETTINGS.taxonomy_cache_path,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
219
        local_only: bool,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
220
    ) -> Taxonomy:
221
        cache = CachedTaxonomyCache(local_only=local_only, cache_dir=cache_dir)
222
        vertebrata = cache.load_vertebrate(Globals.vertebrata)
223
        return vertebrata.subtrees_by_ids_or_names(allow).exclude_subtrees_by_ids_or_names(forbid)
224
        # TODO:
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
225
        vertebrates: Set[Union[int, str]] = {t for t in allow if t in vertebrata}
0 ignored issues
show
Unused Code introduced by
This code does not seem to be reachable.
Loading history...
226
        invertebrates: Set[Union[int, str]] = {t for t in allow if t not in vertebrata}
227
        trees: Set[Taxonomy] = {cache.load(t) for t in vertebrates}
228
        if len(invertebrates) > 0:
229
            if len(ancestors) == 0:
230
                new = {cache.load(t) for t in invertebrates}
231
            else:
232
                new = Taxonomy.from_trees({cache.load(t) for t in ancestors})
233
            trees.add(new.subtrees_by_ids_or_names(invertebrates))
234
        return Taxonomy.from_trees(trees).exclude_subtrees_by_ids_or_names(forbid)
235
236
237
__all__ = ["TaxonomyFactory", "TaxonomyFactories"]
238