1
|
|
|
import abc |
|
|
|
|
2
|
|
|
import time |
3
|
|
|
import urllib |
4
|
|
|
from pathlib import Path |
5
|
|
|
from typing import Optional |
6
|
|
|
from urllib import request |
|
|
|
|
7
|
|
|
|
8
|
|
|
import decorateme |
|
|
|
|
9
|
|
|
from pocketutils.core.dot_dict import NestedDotDict |
|
|
|
|
10
|
|
|
from pocketutils.core.query_utils import QueryExecutor, QueryMixin |
|
|
|
|
11
|
|
|
|
12
|
|
|
from mandos.model import Api, CompoundNotFoundError |
13
|
|
|
from mandos.model.apis.hmdb_support.hmdb_data import HmdbData |
14
|
|
|
from mandos.model.settings import QUERY_EXECUTORS, SETTINGS |
15
|
|
|
from mandos.model.utils import unlink |
16
|
|
|
from mandos.model.utils.setup import logger |
17
|
|
|
|
18
|
|
|
|
19
|
|
|
class HmdbCompoundLookupError(CompoundNotFoundError): |
|
|
|
|
20
|
|
|
""" """ |
21
|
|
|
|
22
|
|
|
|
23
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
24
|
|
|
class HmdbApi(Api, metaclass=abc.ABCMeta): |
25
|
|
|
def fetch(self, hmdb_id: str) -> HmdbData: |
|
|
|
|
26
|
|
|
raise NotImplementedError() |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
30
|
|
|
class QueryingHmdbApi(HmdbApi, QueryMixin): |
31
|
|
|
def __init__(self, executor: QueryExecutor = QUERY_EXECUTORS.hmdb): |
32
|
|
|
self._executor = executor |
33
|
|
|
|
34
|
|
|
@property |
35
|
|
|
def executor(self) -> QueryExecutor: |
|
|
|
|
36
|
|
|
raise NotImplementedError() |
37
|
|
|
|
38
|
|
|
def fetch(self, inchikey_or_hmdb_id: str) -> HmdbData: |
|
|
|
|
39
|
|
|
logger.debug(f"Downloading HMDB data for {inchikey_or_hmdb_id}") |
40
|
|
|
# e.g. https://hmdb.ca/metabolites/HMDB0001925.xml |
41
|
|
|
cid = None |
42
|
|
|
if inchikey_or_hmdb_id.startswith("HMDB"): |
43
|
|
|
cid = inchikey_or_hmdb_id |
44
|
|
|
else: |
45
|
|
|
time.sleep(SETTINGS.hmdb_query_delay_min) # TODO |
|
|
|
|
46
|
|
|
url = f"https://hmdb.ca/unearth/q?query={inchikey_or_hmdb_id}&searcher=metabolites" |
47
|
|
|
try: |
48
|
|
|
res = urllib.request.urlopen(url) |
49
|
|
|
url_ = res.geturl() |
50
|
|
|
logger.trace(f"Got UR {url_} from {url}") |
51
|
|
|
cid = url_.split("/")[-1] |
52
|
|
|
if not cid.startswith("HMDB"): |
53
|
|
|
raise ValueError(f"Invalid CID {cid} from URL {url_}") |
54
|
|
|
except Exception: |
55
|
|
|
raise HmdbCompoundLookupError(f"No HMDB match for {inchikey_or_hmdb_id}") |
56
|
|
|
url = f"https://hmdb.ca/metabolites/{cid}.xml" |
57
|
|
|
try: |
58
|
|
|
data = self._executor(url) |
59
|
|
|
except Exception: |
60
|
|
|
raise HmdbCompoundLookupError(f"No HMDB match for {inchikey_or_hmdb_id} ({cid})") |
61
|
|
|
return HmdbData(self._to_json(data)) |
62
|
|
|
|
63
|
|
|
def _to_json(self, xml) -> NestedDotDict: |
64
|
|
|
response = {} |
65
|
|
|
for child in list(xml): |
66
|
|
|
if len(list(child)) > 0: |
67
|
|
|
response[child.tag] = self._to_json(child) |
68
|
|
|
else: |
69
|
|
|
response[child.tag] = child.text or "" |
70
|
|
|
return NestedDotDict(response) |
71
|
|
|
|
72
|
|
|
|
73
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
74
|
|
|
class CachingHmdbApi(HmdbApi): |
75
|
|
|
def __init__( |
76
|
|
|
self, query: Optional[QueryingHmdbApi], cache_dir: Path = SETTINGS.hmdb_cache_path |
|
|
|
|
77
|
|
|
): |
78
|
|
|
self._query = query |
79
|
|
|
self._cache_dir = cache_dir |
80
|
|
|
|
81
|
|
|
def path(self, inchikey_or_hmdb_id: str) -> Path: |
|
|
|
|
82
|
|
|
return self._cache_dir / f"{inchikey_or_hmdb_id}.json.gz" |
83
|
|
|
|
84
|
|
|
def fetch(self, inchikey_or_hmdb_id: str) -> HmdbData: |
|
|
|
|
85
|
|
|
path = self.path(inchikey_or_hmdb_id) |
86
|
|
|
if path.exists(): |
|
|
|
|
87
|
|
|
return HmdbData(NestedDotDict.read_json(path)) |
88
|
|
|
else: |
89
|
|
|
data = self._query.fetch(inchikey_or_hmdb_id) |
90
|
|
|
path = self.path(data.cid) |
91
|
|
|
data._data.write_json(path, mkdirs=True) |
|
|
|
|
92
|
|
|
logger.info(f"Saved HMDB metabolite {data.cid}") |
93
|
|
|
self._write_links(data) |
94
|
|
|
return data |
95
|
|
|
|
96
|
|
|
def _write_links(self, data: HmdbData) -> None: |
97
|
|
|
path = self.path(data.cid) |
98
|
|
|
# these all have different prefixes, so it's ok |
99
|
|
|
aliases = [ |
100
|
|
|
data.inchikey, |
101
|
|
|
*[ell for ell in [data.cas, data.pubchem_id, data.drugbank_id] if ell is not None], |
102
|
|
|
] |
103
|
|
|
for alias in aliases: |
104
|
|
|
link = self.path(alias) |
105
|
|
|
unlink(link, missing_ok=True) |
106
|
|
|
path.link_to(link) |
107
|
|
|
logger.debug(f"Added aliases {','.join([str(s) for s in aliases])} ⇌ {data.cid} ({path})") |
108
|
|
|
|
109
|
|
|
|
110
|
|
|
__all__ = [ |
111
|
|
|
"HmdbApi", |
112
|
|
|
"QueryingHmdbApi", |
113
|
|
|
"CachingHmdbApi", |
114
|
|
|
"HmdbCompoundLookupError", |
115
|
|
|
] |
116
|
|
|
|