Passed
Push — dependabot/pip/pyarrow-4.0.1 ( ca09ce...b2836e )
by
unknown
02:18 queued 20s
created

UniprotTaxonomyCache._load()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 2
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
1
"""
2
Caching.
3
"""
4
5
from __future__ import annotations
6
7
import abc
8
import shutil
9
from datetime import datetime
10
from pathlib import Path
11
from typing import Iterable, Optional, Sequence, Union
12
13
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
14
import requests
0 ignored issues
show
introduced by
Unable to import 'requests'
Loading history...
15
from pocketutils.core.hashers import Hasher
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.hashers'
Loading history...
16
17
from mandos import logger
18
from mandos.model import MandosResources
19
from mandos.model.settings import MANDOS_SETTINGS
20
from mandos.model.taxonomy import Taxonomy
21
22
hasher = Hasher("sha1")
23
24
25
class TaxonomyFactory(metaclass=abc.ABCMeta):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
26
    def load(self, taxon: Union[int, str]) -> Taxonomy:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
27
        raise NotImplementedError()
28
29
30
class UniprotTaxonomyCache(TaxonomyFactory, metaclass=abc.ABCMeta):
31
    """
32
    Preps a new taxonomy file for use in mandos.
33
    Just returns if a corresponding file already exists in the resources dir or mandos cache (``~/.mandos``).
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
34
    Otherwise, downloads a tab-separated file from UniProt.
35
    (To find manually, follow the ``All lower taxonomy nodes`` link and click ``Download``.)
36
    Then applies fixes and reduces the file size, creating a new file alongside.
37
    Puts both the raw data and fixed data in the cache under ``~/.mandos/taxonomy/``.
38
    """
39
40
    def load_by_name(self, taxon: str) -> Taxonomy:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
41
        vertebrata = Taxonomy.from_path(MandosResources.VERTEBRATA_PATH)
42
        only = vertebrata.req_only_by_name(taxon)
43
        return vertebrata.subtree(only.id)
44
45
    def load(self, taxon: Union[int, str]) -> Taxonomy:
46
        """
47
        Tries, in order:
48
49
            1. A cached file exactly matching the taxon ID
50
            2. A taxon ID under vertebrata
51
            3. The UNIQUE name of a taxon under vertebrata
52
            4. Downloads the taxonomy with the specified ID
53
        """
54
        tree = self._load(taxon)
55
        logger.info(f"Taxonomy has {len(tree)} taxa with {len(tree.roots)} roots")
56
        return tree
57
58
    def _load(self, taxon: Union[int, str]) -> Taxonomy:
59
        exact = self.load_vertebrate(taxon)
60
        if exact is not None:
61
            logger.info(f"Taxon {taxon} found in cached file")
62
            return exact
63
        vertebrate = self.load_vertebrate(taxon)
64
        if vertebrate is not None:
65
            logger.info(f"Taxon {taxon} found in the vertebrata cache")
66
            return vertebrate
67
        raise LookupError(f"Could not find taxon {taxon}; try passing an ID instead")
68
69
    def load_exact(self, taxon: int) -> Optional[Taxonomy]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
70
        path = self._resolve_non_vertebrate_final(taxon)
71
        return Taxonomy.from_path(path) if path.exists() else None
72
73
    def load_vertebrate(self, taxon: Union[int, str]) -> Optional[Taxonomy]:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
introduced by
Missing function or method docstring
Loading history...
74
        vertebrata = Taxonomy.from_path(MandosResources.VERTEBRATA_PATH)
75
        vertebrate = vertebrata.subtrees_by_ids_or_names([taxon])
76
        return vertebrate if vertebrate.n_taxa() > 0 else None
77
78
    def load_dl(self, taxon: Union[int, str]) -> Taxonomy:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
79
        raw_path = self._resolve_non_vertebrate_raw(taxon)
80
        if raw_path.exists():
81
            logger.warning(f"Converting temp file for taxon {taxon} at {raw_path} .")
82
            # getting the mod date because creation dates are iffy cross-platform
83
            # (in fact the Linux kernel doesn't bother to expose them)
84
            when = datetime.fromtimestamp(raw_path.stat().st_mtime).strftime("%Y-%m-%d")
85
            logger.warning(f"It may be out of date. (File mod date: {when})")
86
        else:
87
            logger.info(f"Downloading new taxonomy file for taxon {taxon} .")
88
            self._download(raw_path, taxon)
89
        path = self._resolve_non_vertebrate_final(taxon)
90
        self._fix(raw_path, taxon, path)
91
        logger.info(f"Cached taxonomy at {path} .")
92
        return Taxonomy.from_path(path)
93
94
    def _resolve_non_vertebrate_final(self, taxon: int) -> Path:
95
        raise NotImplementedError()
96
97
    def _resolve_non_vertebrate_raw(self, taxon: int) -> Path:
98
        raise NotImplementedError()
99
100
    def _download(self, raw_path: Path, taxon: int) -> None:
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
101
        # this is faster and safer than using pd.read_csv(url)
102
        # https://uniprot.org/taxonomy/?query=ancestor:7742&format=tab&force=true&columns=id&compress=yes
103
        url = f"https://uniprot.org/taxonomy/?query=ancestor:{taxon}&format=tab&force=true&columns=id&compress=yes"
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (115/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
104
        with requests.get(url, stream=True) as r:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "r" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
105
            with raw_path.open("wb") as f:
0 ignored issues
show
Coding Style Naming introduced by
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
106
                shutil.copyfileobj(r.raw, f)
107
        hasher.to_write(raw_path).write()
108
109
    def _fix(self, raw_path: Path, taxon: int, final_path: Path) -> None:
110
        # now process it!
111
        # unfortunately it won't include an entry for the root ancestor (`taxon`)
112
        # so, we'll add it in (in ``df.append`` below)
113
        df = pd.read_file(raw_path)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
114
        # find the scientific name of the parent
115
        scientific_name = self._determine_name(df, taxon)
116
        # now fix the columns
117
        df = df[["Taxon", "Scientific name", "Common name", "Parent"]]
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
118
        df.columns = ["taxon", "scientific_name", "common_name", "parent"]
119
        # now add the ancestor back in
120
        df = df.append(
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
121
            pd.Series(dict(taxon=taxon, scientific_name=scientific_name, parent=0)),
122
            ignore_index=True,
123
        )
124
        # write it to a feather / csv / whatever
125
        df["parent"] = df["parent"].astype(int)
126
        df.write_file(final_path)
127
128
    def _determine_name(self, df: pd.DataFrame, taxon: int) -> str:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
129
        got = df[df["Parent"] == taxon]
130
        if len(got) == 0:
131
            raise ValueError(f"Could not infer scientific name for {taxon}")
132
        z = str(list(got["Lineage"])[0])
0 ignored issues
show
Coding Style Naming introduced by
Variable name "z" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
133
        return z.split("; ")[-1].strip()
134
135
136
class FixedTaxonomyFactory(TaxonomyFactory):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
137
    def __init__(self, tax: Taxonomy):
138
        self._tax = tax
139
140
    def load(self, taxon: Union[int, str]) -> Taxonomy:
141
        if isinstance(taxon, str):
142
            taxon = self._tax.req_only_by_name(taxon).id
143
        return self._tax.subtree(taxon)
144
145
146
class FixedFileTaxonomyFactory(TaxonomyFactory):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
147
    def __init__(self, path: Path):
148
        self._path = path
149
150
    def load(self, taxon: Union[int, str]) -> Taxonomy:
151
        taxonomy = Taxonomy.from_path(self._path)
152
        if isinstance(taxon, str):
153
            taxon = taxonomy.req_only_by_name(taxon).id
154
        return taxonomy.subtree(taxon)
155
156
157
class CacheDirTaxonomyCache(UniprotTaxonomyCache):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
158
    def __init__(self, cache_dir: Path):
159
        self.cache_dir = cache_dir
160
161
    def _resolve_non_vertebrate_final(self, taxon: int) -> Path:
162
        return self._get_resource(MANDOS_SETTINGS.taxonomy_filename_format.format(taxon))
163
164
    def _resolve_non_vertebrate_raw(self, taxon: int) -> Path:
165
        # this is what is downloaded from PubChem
166
        # the filename is the same
167
        return self._get_resource(f"taxonomy-ancestor_{taxon}.feather")
168
169
    def _get_resource(self, *nodes: Union[Path, str]) -> Path:
170
        path = MandosResources.path(*nodes)
171
        if path.exists():
172
            return path
173
        self.cache_dir.mkdir(parents=True, exist_ok=True)
174
        return Path(self.cache_dir, *nodes)
175
176
177
class TaxonomyFactories:
178
    """
179
    Collection of static factory methods.
180
    """
181
182
    @classmethod
183
    def from_vertebrata(cls) -> UniprotTaxonomyCache:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
184
        return CacheDirTaxonomyCache(MandosResources.VERTEBRATA_PATH)
185
186
    @classmethod
187
    def from_uniprot(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
188
        cls, cache_dir: Path = MANDOS_SETTINGS.taxonomy_cache_path
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
189
    ) -> UniprotTaxonomyCache:
190
        return CacheDirTaxonomyCache(cache_dir)
191
192
    @classmethod
193
    def from_fixed_file(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
194
        cls, cache_dir: Path = MANDOS_SETTINGS.taxonomy_cache_path
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
195
    ) -> TaxonomyFactory:
196
        return FixedFileTaxonomyFactory(cache_dir)
197
198
    @classmethod
199
    def get_smart_taxonomy(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
200
        cls,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
201
        allow: Iterable[Union[int, str]],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
202
        forbid: Iterable[Union[int, str]],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
203
        cache_dir: Path = MANDOS_SETTINGS.taxonomy_cache_path,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
204
    ) -> Taxonomy:
205
        vertebrata = cls.from_vertebrata().load(7742)
206
        vertebrates = vertebrata.subtrees_by_ids_or_names(allow)
207
        invertebrates: Sequence[Taxonomy] = [
208
            cls.from_uniprot(cache_dir).load(taxon)
209
            for taxon in allow
210
            if vertebrata.get_by_id_or_name(taxon) is None
211
        ]
212
        my_tax = Taxonomy.from_trees([vertebrates, *invertebrates])
213
        my_tax = my_tax.exclude_subtrees_by_ids_or_names(forbid)
214
        return my_tax
215
216
217
__all__ = ["TaxonomyFactory", "TaxonomyFactories"]
218