Passed
Push — main ( 4e4203...cdf0f7 )
by Douglas
01:39
created

UniprotTaxonomyCache.load()   C

Complexity

Conditions 10

Size

Total Lines 29
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 20
nop 2
dl 0
loc 29
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like mandos.model.taxonomy_caches.UniprotTaxonomyCache.load() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Caching.
3
"""
4
5
from __future__ import annotations
6
7
import abc
8
import shutil
9
from pathlib import Path
10
from typing import Union
11
12
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
13
import requests
0 ignored issues
show
introduced by
Unable to import 'requests'
Loading history...
14
from pocketutils.core.hashers import Hasher
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.hashers'
Loading history...
15
16
from mandos import logger
0 ignored issues
show
Unused Code introduced by
Unused logger imported from mandos
Loading history...
17
from mandos.model import MandosResources
18
from mandos.model.taxonomy import Taxonomy
19
20
hasher = Hasher("sha1")
21
22
23
class TaxonomyFactory(metaclass=abc.ABCMeta):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
24
    def load(self, taxon: int) -> Taxonomy:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
25
        raise NotImplementedError()
26
27
28
class UniprotTaxonomyCache(TaxonomyFactory, metaclass=abc.ABCMeta):
29
    """
30
    Preps a new taxonomy file for use in mandos.
31
    Just returns if a corresponding file already exists in the resources dir or mandos cache (``~/.mandos``).
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
32
    Otherwise, downloads a tab-separated file from UniProt.
33
    (To find manually, follow the ``All lower taxonomy nodes`` link and click ``Download``.)
34
    Then applies fixes and reduces the file size, creating a new file alongside.
35
    Puts both the raw data and fixed data in the cache under ``~/.mandos/taxonomy/``.
36
    """
37
38
    def load_by_name(self, taxon: str) -> Taxonomy:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
39
        vertebrata = Taxonomy.from_path(MandosResources.path("7742.tab.gz"))
40
        only = vertebrata.req_only_by_name(taxon)
41
        return vertebrata.subtree(only.id)
42
43
    def load(self, taxon: Union[int, str]) -> Taxonomy:
44
        """
45
        Tries, in order:
46
47
            1. A cached file exactly matching the taxon ID
48
            2. A taxon ID under vertebrata
49
            3. The UNIQUE name of a taxon under vertebrata
50
            4. Downloads the taxonomy with the specified ID
51
        """
52
        if isinstance(taxon, str) and taxon.isdigit():
53
            taxon = int(taxon)
54
        if isinstance(taxon, int):
55
            path = self._resolve_non_vertebrate_final(taxon)
56
            if path.exists():
57
                return Taxonomy.from_path(path)
58
        vertebrata = Taxonomy.from_path(MandosResources.path("7742.tab.gz"))
59
        if isinstance(taxon, int) and taxon in vertebrata:
0 ignored issues
show
unused-code introduced by
Unnecessary "elif" after "return"
Loading history...
60
            return vertebrata.subtree(taxon)
61
        elif isinstance(taxon, str):
62
            match = vertebrata.req_only_by_name(taxon).id
63
            return vertebrata.subtree(match)
64
        if isinstance(taxon, int):
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
65
            raw_path = self._resolve_non_vertebrate_raw(taxon)
66
            if not raw_path.exists():
67
                self._download(raw_path, taxon)
68
                self._fix(raw_path, taxon, path)
0 ignored issues
show
introduced by
The variable path does not seem to be defined in case isinstance(taxon, int) on line 54 is False. Are you sure this can never be the case?
Loading history...
69
            return Taxonomy.from_path(path)
70
        else:
71
            raise LookupError(f"Could not find taxon {taxon}; try passing an ID instead")
72
73
    def _resolve_non_vertebrate_final(self, taxon: int) -> Path:
74
        raise NotImplementedError()
75
76
    def _resolve_non_vertebrate_raw(self, taxon: int) -> Path:
77
        raise NotImplementedError()
78
79
    def _download(self, raw_path: Path, taxon: int) -> None:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
80
        # this is faster and safer than using pd.read_csv(url)
81
        # https://uniprot.org/taxonomy/?query=ancestor:7742&format=tab&force=true&columns=id&compress=yes
82
        url = f"https://uniprot.org/taxonomy/?query=ancestor:{taxon}&format=tab&force=true&columns=id&compress=yes"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (115/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
83
        with requests.get(url, stream=True) as r:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "r" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
84
            with raw_path.open("wb") as f:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
85
                shutil.copyfileobj(r.raw, f)
86
        hasher.to_write(raw_path).write()
87
88
    def _fix(self, raw_path: Path, taxon: int, final_path: Path):
89
        # now process it!
90
        # unfortunately it won't include an entry for the root ancestor (`taxon`)
91
        # so, we'll add it in
92
        df = pd.read_csv(raw_path, sep="\t")
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
93
        # find the scientific name of the parent
94
        scientific_name = self._determine_name(df, taxon)
95
        # now fix the columns
96
        df = df[["Taxon", "Scientific name", "Parent"]]
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
97
        df.columns = ["taxon", "scientific_name", "parent"]
98
        # now add the ancestor back in
99
        df = df.append(
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
100
            pd.Series(dict(taxon=taxon, scientific_name=scientific_name, parent=0)),
101
            ignore_index=True,
102
        )
103
        # write it to a csv.gz
104
        df["parent"] = df["parent"].astype(int)
105
        df.to_csv(final_path, index=False, sep="\t")
106
107
    def _determine_name(self, df: pd.DataFrame, taxon: int) -> str:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
108
        got = df[df["Parent"] == taxon]
109
        if len(got) == 0:
110
            raise ValueError(f"Could not infer scientific name for {taxon}")
111
        z = str(list(got["Lineage"])[0])
0 ignored issues
show
Coding Style Naming introduced by
Variable name "z" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
112
        return z.split("; ")[-1].strip()
113
114
115
class FixedTaxonomyFactory(TaxonomyFactory):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
116
    def __init__(self, tax: Taxonomy):
117
        self._tax = tax
118
119
    def load(self, taxon: Union[int, str]) -> Taxonomy:
120
        if isinstance(taxon, str):
121
            taxon = self._tax.req_only_by_name(taxon).id
122
        return self._tax.subtree(taxon)
123
124
125
class FixedFileTaxonomyFactory(TaxonomyFactory):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
126
    def __init__(self, path: Path):
127
        self._path = path
128
129
    def load(self, taxon: Union[int, str]) -> Taxonomy:
130
        taxonomy = Taxonomy.from_path(self._path)
131
        if isinstance(taxon, str):
132
            taxon = taxonomy.req_only_by_name(taxon).id
133
        return taxonomy.subtree(taxon)
134
135
136
class CacheDirTaxonomyCache(UniprotTaxonomyCache):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
137
    def __init__(self, cache_dir: Path):
138
        self.cache_dir = cache_dir
139
140
    def _resolve_non_vertebrate_final(self, taxon: int) -> Path:
141
        return self._get_resource(f"{taxon}.tab.gz")
142
143
    def _resolve_non_vertebrate_raw(self, taxon: int) -> Path:
144
        return self._get_resource(f"taxonomy-ancestor_{taxon}.tab.gz")
145
146
    def _get_resource(self, *nodes: Union[Path, str]) -> Path:
147
        path = MandosResources.path(*nodes)
148
        if path.exists():
149
            return path
150
        self.cache_dir.mkdir(parents=True, exist_ok=True)
151
        return Path(self.cache_dir, *nodes)
152
153
154
class TaxonomyFactories:
155
    """
156
    Collection of static factory methods.
157
    """
158
159
    @classmethod
160
    def from_vertebrata(cls) -> TaxonomyFactory:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
161
        return CacheDirTaxonomyCache(MandosResources.path("7742.tab.gz"))
162
163
    @classmethod
164
    def from_uniprot(cls, cache_dir: Path) -> TaxonomyFactory:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
165
        return CacheDirTaxonomyCache(cache_dir)
166
167
    @classmethod
168
    def from_fixed_file(cls, cache_dir: Path) -> TaxonomyFactory:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
169
        return FixedFileTaxonomyFactory(cache_dir)
170
171
172
__all__ = ["TaxonomyFactory", "TaxonomyFactories"]
173