1
|
|
|
import abc |
|
|
|
|
2
|
|
|
import time |
3
|
|
|
import urllib |
4
|
|
|
from pathlib import Path |
5
|
|
|
from typing import Optional |
6
|
|
|
from urllib import request |
|
|
|
|
7
|
|
|
|
8
|
|
|
import decorateme |
|
|
|
|
9
|
|
|
from pocketutils.core.dot_dict import NestedDotDict |
|
|
|
|
10
|
|
|
from pocketutils.core.query_utils import QueryExecutor |
|
|
|
|
11
|
|
|
|
12
|
|
|
from mandos.model import Api, CompoundNotFoundError |
13
|
|
|
from mandos.model.apis.hmdb_support.hmdb_data import HmdbData |
14
|
|
|
from mandos.model.settings import QUERY_EXECUTORS, SETTINGS |
15
|
|
|
from mandos.model.utils.setup import logger |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
class HmdbCompoundLookupError(CompoundNotFoundError): |
|
|
|
|
19
|
|
|
""" """ |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
23
|
|
|
class HmdbApi(Api, metaclass=abc.ABCMeta): |
24
|
|
|
def fetch(self, hmdb_id: str) -> HmdbData: |
|
|
|
|
25
|
|
|
raise NotImplementedError() |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
29
|
|
|
class QueryingHmdbApi(HmdbApi): |
30
|
|
|
def __init__(self, executor: QueryExecutor = QUERY_EXECUTORS.hmdb): |
31
|
|
|
self._executor = executor |
32
|
|
|
|
33
|
|
|
def fetch(self, inchikey_or_hmdb_id: str) -> HmdbData: |
|
|
|
|
34
|
|
|
logger.debug(f"Downloading HMDB data for {inchikey_or_hmdb_id}") |
35
|
|
|
# e.g. https://hmdb.ca/metabolites/HMDB0001925.xml |
36
|
|
|
cid = None |
37
|
|
|
if inchikey_or_hmdb_id.startswith("HMDB"): |
38
|
|
|
cid = inchikey_or_hmdb_id |
39
|
|
|
else: |
40
|
|
|
time.sleep(SETTINGS.hmdb_query_delay_min) # TODO |
|
|
|
|
41
|
|
|
url = f"https://hmdb.ca/unearth/q?query={inchikey_or_hmdb_id}&searcher=metabolites" |
42
|
|
|
try: |
43
|
|
|
res = urllib.request.urlopen(url) |
44
|
|
|
url_ = res.geturl() |
45
|
|
|
logger.trace(f"Got UR {url_} from {url}") |
46
|
|
|
cid = url_.split("/")[-1] |
47
|
|
|
if not cid.startswith("HMDB"): |
48
|
|
|
raise ValueError(f"Invalid CID {cid} from URL {url_}") |
49
|
|
|
except Exception: |
50
|
|
|
raise HmdbCompoundLookupError(f"No HMDB match for {inchikey_or_hmdb_id}") |
51
|
|
|
url = f"https://hmdb.ca/metabolites/{cid}.xml" |
52
|
|
|
try: |
53
|
|
|
data = self._executor(url) |
54
|
|
|
except Exception: |
55
|
|
|
raise HmdbCompoundLookupError(f"No HMDB match for {inchikey_or_hmdb_id} ({cid})") |
56
|
|
|
return HmdbData(self._to_json(data)) |
57
|
|
|
|
58
|
|
|
def _to_json(self, xml) -> NestedDotDict: |
59
|
|
|
response = {} |
60
|
|
|
for child in list(xml): |
61
|
|
|
if len(list(child)) > 0: |
62
|
|
|
response[child.tag] = self._to_json(child) |
63
|
|
|
else: |
64
|
|
|
response[child.tag] = child.text or "" |
65
|
|
|
return NestedDotDict(response) |
66
|
|
|
|
67
|
|
|
def _query(self, url: str) -> str: |
68
|
|
|
data = self._executor(url) |
69
|
|
|
tt = self._executor.last_time_taken |
|
|
|
|
70
|
|
|
wt, qt = tt.wait.total_seconds(), tt.query.total_seconds() |
|
|
|
|
71
|
|
|
bts = int(len(data) * 8 / 1024) |
72
|
|
|
logger.trace(f"Queried {bts} kb from {url} in {qt:.1} s with {wt:.1} s of wait") |
73
|
|
|
return data |
74
|
|
|
|
75
|
|
|
|
76
|
|
|
@decorateme.auto_repr_str() |
|
|
|
|
77
|
|
|
class CachingHmdbApi(HmdbApi): |
78
|
|
|
def __init__( |
79
|
|
|
self, query: Optional[QueryingHmdbApi], cache_dir: Path = SETTINGS.hmdb_cache_path |
|
|
|
|
80
|
|
|
): |
81
|
|
|
self._query = query |
82
|
|
|
self._cache_dir = cache_dir |
83
|
|
|
|
84
|
|
|
def path(self, inchikey_or_hmdb_id: str) -> Path: |
|
|
|
|
85
|
|
|
return self._cache_dir / f"{inchikey_or_hmdb_id}.json.gz" |
86
|
|
|
|
87
|
|
|
def fetch(self, inchikey_or_hmdb_id: str) -> HmdbData: |
|
|
|
|
88
|
|
|
path = self.path(inchikey_or_hmdb_id) |
89
|
|
|
if path.exists(): |
|
|
|
|
90
|
|
|
return HmdbData(NestedDotDict.read_json(path)) |
91
|
|
|
else: |
92
|
|
|
data = self._query.fetch(inchikey_or_hmdb_id) |
93
|
|
|
path = self.path(data.cid) |
94
|
|
|
data._data.write_json(path, mkdirs=True) |
|
|
|
|
95
|
|
|
logger.info(f"Saved HMDB metabolite {data.cid}") |
96
|
|
|
self._write_links(data) |
97
|
|
|
return data |
98
|
|
|
|
99
|
|
|
def _write_links(self, data: HmdbData) -> None: |
100
|
|
|
path = self.path(data.cid) |
101
|
|
|
# these all have different prefixes, so it's ok |
102
|
|
|
aliases = [ |
103
|
|
|
data.inchikey, |
104
|
|
|
*[ell for ell in [data.cas, data.pubchem_id, data.drugbank_id] if ell is not None], |
105
|
|
|
] |
106
|
|
|
for alias in aliases: |
107
|
|
|
link = self.path(alias) |
108
|
|
|
link.unlink(missing_ok=True) |
109
|
|
|
path.link_to(link) |
110
|
|
|
logger.debug(f"Added aliases {','.join([str(s) for s in aliases])} ⇌ {data.cid} ({path})") |
111
|
|
|
|
112
|
|
|
|
113
|
|
|
__all__ = [ |
114
|
|
|
"HmdbApi", |
115
|
|
|
"QueryingHmdbApi", |
116
|
|
|
"CachingHmdbApi", |
117
|
|
|
"HmdbCompoundLookupError", |
118
|
|
|
] |
119
|
|
|
|