|
1
|
|
|
import dataclasses |
|
|
|
|
|
|
2
|
|
|
import logging |
|
3
|
|
|
import os |
|
4
|
|
|
from dataclasses import dataclass |
|
5
|
|
|
from typing import Collection, Iterable, List, Mapping, Optional, Union |
|
6
|
|
|
from urllib import request |
|
7
|
|
|
|
|
8
|
|
|
import pandas as pd |
|
|
|
|
|
|
9
|
|
|
import regex |
|
|
|
|
|
|
10
|
|
|
import uniprot |
|
|
|
|
|
|
11
|
|
|
|
|
12
|
|
|
# uses https://github.com/tanghaibao/goatools |
|
13
|
|
|
from goatools import obo_parser |
|
|
|
|
|
|
14
|
|
|
|
|
15
|
|
|
# NOT the same as FlatGoTerm, which has no knowledge of hierarchy |
|
16
|
|
|
from goatools.obo_parser import GOTerm |
|
|
|
|
|
|
17
|
|
|
|
|
18
|
|
|
from pocketutils.core.exceptions import MultipleMatchesError, StringPatternError |
|
|
|
|
|
|
19
|
|
|
|
|
20
|
|
|
# noinspection PyProtectedMember |
|
21
|
|
|
from pocketutils.core.input_output import silenced |
|
|
|
|
|
|
22
|
|
|
|
|
23
|
|
|
go_pattern = regex.compile( |
|
24
|
|
|
r"GO:(\d+); ([CFP]):([\dA-Za-z- ,()]+); ([A-Z]+):([A-Za-z-_]+)\.", flags=regex.V1 |
|
25
|
|
|
) |
|
26
|
|
|
GO_OBO_URL = "http://current.geneontology.org/ontology/go.obo" # nosec |
|
27
|
|
|
GO_OBO_FILENAME = "go.obo" |
|
28
|
|
|
logger = logging.getLogger("pocketutils") |
|
29
|
|
|
|
|
30
|
|
|
|
|
31
|
|
|
@dataclass(frozen=True, repr=True) |
|
32
|
|
|
class FlatGoTerm: |
|
33
|
|
|
""" |
|
34
|
|
|
A Gene Ontology term. |
|
35
|
|
|
Not to be confused with GOTerm in goatools: obo_parser.GOTerm |
|
36
|
|
|
|
|
37
|
|
|
Attributes: |
|
38
|
|
|
- identifier: (str); ex: GO:0005737 |
|
39
|
|
|
- kind: (str: 'P'==process, 'C'==component, 'F'==function) |
|
40
|
|
|
- description: (str) |
|
41
|
|
|
- sourceId: (str); ex: IDA |
|
42
|
|
|
- sourceName: (str); ex: UniProtKB |
|
43
|
|
|
""" |
|
44
|
|
|
|
|
45
|
|
|
identifier: str |
|
46
|
|
|
kind: str |
|
47
|
|
|
description: str |
|
48
|
|
|
source_id: str |
|
49
|
|
|
source_name: str |
|
50
|
|
|
|
|
51
|
|
|
@classmethod |
|
52
|
|
|
def parse(cls, stwing: str): |
|
53
|
|
|
""" |
|
54
|
|
|
Builds a GO term from a string from uniprot_obj['go']. |
|
55
|
|
|
Raises: |
|
56
|
|
|
ValueError: if the syntax is wrong. |
|
57
|
|
|
""" |
|
58
|
|
|
match = go_pattern.search(stwing) |
|
59
|
|
|
if match is None: |
|
60
|
|
|
raise StringPatternError( |
|
61
|
|
|
f"String didn't match GO term pattern: {stwing}", |
|
62
|
|
|
value=stwing, |
|
63
|
|
|
pattern=go_pattern, |
|
64
|
|
|
) |
|
65
|
|
|
return FlatGoTerm( |
|
66
|
|
|
"GO:" + match.group(1), |
|
67
|
|
|
match.group(2), |
|
68
|
|
|
match.group(3), |
|
69
|
|
|
match.group(4), |
|
70
|
|
|
match.group(5), |
|
71
|
|
|
) |
|
72
|
|
|
|
|
73
|
|
|
def to_series(self) -> pd.Series: |
|
|
|
|
|
|
74
|
|
|
return pd.Series(dataclasses.asdict(self)) |
|
75
|
|
|
|
|
76
|
|
|
|
|
77
|
|
|
class UniprotGoTerms: |
|
|
|
|
|
|
78
|
|
|
def fetch_uniprot_data(self, uniprot_ids: Union[str, List[str]]) -> List[Mapping[str, str]]: |
|
|
|
|
|
|
79
|
|
|
""" |
|
80
|
|
|
Fetches a list of dicts of UniProt metadata, one per UniProt ID. |
|
81
|
|
|
|
|
82
|
|
|
Raises: |
|
83
|
|
|
ValueError: If a UniProt ID wasn't found. |
|
84
|
|
|
""" |
|
85
|
|
|
if isinstance(uniprot_ids, str): # not a list type |
|
86
|
|
|
uniprot_ids = [uniprot_ids] |
|
87
|
|
|
# if we don't prevent these here, we'll get a ValueError from below, which is confusing |
|
88
|
|
|
# That's because uniprot.fetch_uniprot_metadata will only return one per unique ID |
|
89
|
|
|
if len(set(uniprot_ids)) != len(uniprot_ids): |
|
90
|
|
|
raise MultipleMatchesError("Set of UniProt IDs cannot contain duplicates") |
|
91
|
|
|
with silenced(no_stderr=False): |
|
92
|
|
|
uniprot_data = uniprot.fetch_uniprot_metadata(uniprot_ids) |
|
93
|
|
|
if uniprot_data is None or uniprot_data == {} or len(uniprot_data) != len(uniprot_ids): |
|
94
|
|
|
raise LookupError(f"At least one UniProt ID not found in {uniprot_ids}") |
|
95
|
|
|
return list(uniprot_data.values()) |
|
96
|
|
|
|
|
97
|
|
|
def go_terms_for_uniprot_id(self, uniprot_id: str) -> List[FlatGoTerm]: |
|
98
|
|
|
"""Returns a list of FlatGoTerm objects from a UniProt ID.""" |
|
99
|
|
|
term_strings = (self.fetch_uniprot_data(uniprot_id)[0])["go"] |
|
100
|
|
|
return [FlatGoTerm(s) for s in term_strings] |
|
101
|
|
|
|
|
102
|
|
|
def go_terms_for_uniprot_id_as_df(self, uniprot_id: str) -> pd.DataFrame: |
|
103
|
|
|
"""Returns a Pandas DataFrame of GO terms from a UniProt ID.""" |
|
104
|
|
|
df = pd.DataFrame(columns=["ID", "kind", "description", "sourceId", "sourceName"]) |
|
|
|
|
|
|
105
|
|
|
for term in self.go_terms_for_uniprot_id(uniprot_id): |
|
106
|
|
|
df.loc[len(df)] = term.to_series() |
|
|
|
|
|
|
107
|
|
|
return df.set_index("ID") |
|
108
|
|
|
|
|
109
|
|
|
|
|
110
|
|
|
class GoTermsAtLevel: |
|
111
|
|
|
""" |
|
112
|
|
|
Gene ontology terms organized by level. |
|
113
|
|
|
|
|
114
|
|
|
Example: |
|
115
|
|
|
.. code-block:: |
|
116
|
|
|
|
|
117
|
|
|
go_term_ancestors_for_uniprot_id_as_df('P42681', 2) |
|
118
|
|
|
""" |
|
119
|
|
|
|
|
120
|
|
|
def __init__(self) -> None: |
|
121
|
|
|
if os.path.exists(GO_OBO_FILENAME): |
|
122
|
|
|
self.obo = obo_parser.GODag(GO_OBO_FILENAME) |
|
123
|
|
|
else: |
|
124
|
|
|
logger.info("Downloading Gene Ontology OBO...") |
|
125
|
|
|
request.urlretrieve(GO_OBO_URL) # nosec |
|
126
|
|
|
# This will be used in query_obo_term |
|
127
|
|
|
self.obo = obo_parser.GODag(GO_OBO_FILENAME) |
|
128
|
|
|
logger.info("Done downloading OBO.") |
|
129
|
|
|
self.substruct = UniprotGoTerms() |
|
130
|
|
|
|
|
131
|
|
|
def query_obo_term(self, term_id: str) -> GOTerm: |
|
132
|
|
|
""" |
|
133
|
|
|
Queries a term through the global obo. |
|
134
|
|
|
This function wraps the call to raise a ValueError if the term is not found; |
|
135
|
|
|
otherwise it only logs a warning. |
|
136
|
|
|
""" |
|
137
|
|
|
x = self.obo.query_term(term_id) |
|
|
|
|
|
|
138
|
|
|
if x is None: |
|
139
|
|
|
raise LookupError(f"Term ID {x} not found") |
|
140
|
|
|
return x |
|
141
|
|
|
|
|
142
|
|
|
def get_ancestors_of_go_term(self, term_id: str, level: int) -> Iterable[GOTerm]: |
|
143
|
|
|
""" |
|
144
|
|
|
From a GO term in the form 'GO:0007344', returns a set of ancestor GOTerm objects at the specified level. |
|
|
|
|
|
|
145
|
|
|
The traversal is restricted to is-a relationships. |
|
146
|
|
|
Note that the level is the minimum number of steps to the root. |
|
147
|
|
|
|
|
148
|
|
|
Args: |
|
149
|
|
|
term_id: The term |
|
150
|
|
|
level: starting at 0 (root) |
|
151
|
|
|
""" |
|
152
|
|
|
|
|
153
|
|
|
def traverse_up(term, buildup_set, lvl): |
|
154
|
|
|
if term.level == lvl: |
|
155
|
|
|
buildup_set.add(term) |
|
156
|
|
|
if term.has_parent: |
|
157
|
|
|
return [traverse_up(p, buildup_set, lvl) for p in term.parents] |
|
158
|
|
|
return None |
|
159
|
|
|
|
|
160
|
|
|
terms = set() |
|
161
|
|
|
traverse_up(self.query_obo_term(term_id), terms, level) |
|
162
|
|
|
return terms |
|
163
|
|
|
|
|
164
|
|
|
def go_term_ancestors_for_uniprot_id( |
|
165
|
|
|
self, uniprot_id: str, level: int, kinds_allowed: Optional[Collection[str]] = None |
|
|
|
|
|
|
166
|
|
|
) -> Iterable[GOTerm]: |
|
167
|
|
|
""" |
|
168
|
|
|
Gets the GO terms associated with a UniProt ID and returns a set of their ancestors at the specified level. |
|
|
|
|
|
|
169
|
|
|
The traversal is restricted to is-a relationships. |
|
170
|
|
|
Note that the level is the minimum number of steps to the root. |
|
171
|
|
|
|
|
172
|
|
|
Args: |
|
173
|
|
|
level: starting at 0 (root) |
|
174
|
|
|
uniprot_id: ID |
|
175
|
|
|
kinds_allowed: a set containing any combination of 'P', 'F', or 'C' |
|
176
|
|
|
""" |
|
177
|
|
|
if kinds_allowed is None: |
|
178
|
|
|
kinds_allowed = ["P", "F", "C"] |
|
179
|
|
|
if len(kinds_allowed) == 0: |
|
180
|
|
|
return [] |
|
181
|
|
|
terms = [ |
|
182
|
|
|
term |
|
183
|
|
|
for term in self.substruct.go_terms_for_uniprot_id(uniprot_id) |
|
184
|
|
|
if term.kind in kinds_allowed |
|
185
|
|
|
] |
|
186
|
|
|
ancestor_terms = set() |
|
187
|
|
|
for term_id in [t.identifier for t in terms]: |
|
188
|
|
|
ancestor_terms.update(self.get_ancestors_of_go_term(term_id, level)) |
|
189
|
|
|
return ancestor_terms |
|
190
|
|
|
|
|
191
|
|
|
def go_term_ancestors_for_uniprot_id_as_df( |
|
192
|
|
|
self, uniprot_id: str, level: int, kinds_allowed: Optional[Collection[str]] = None |
|
|
|
|
|
|
193
|
|
|
) -> pd.DataFrame: |
|
194
|
|
|
""" |
|
195
|
|
|
See go_term_ancestors_for_uniprot_id. |
|
196
|
|
|
|
|
197
|
|
|
Args: |
|
198
|
|
|
uniprot_id: ID |
|
199
|
|
|
level: Level |
|
200
|
|
|
kinds_allowed: Can include 'P', 'F', and/or 'C' |
|
201
|
|
|
|
|
202
|
|
|
Returns: |
|
203
|
|
|
Pandas DataFrame with columns IDand name. |
|
204
|
|
|
""" |
|
205
|
|
|
if kinds_allowed is None: |
|
206
|
|
|
kinds_allowed = ["P", "F", "C"] |
|
207
|
|
|
df = pd.DataFrame(columns=["ID", "name"]) |
|
|
|
|
|
|
208
|
|
|
for term in self.go_term_ancestors_for_uniprot_id(uniprot_id, level, kinds_allowed): |
|
209
|
|
|
df.loc[len(df)] = pd.Series({"ID": term.id, "name": term.name, "level": term.level}) |
|
|
|
|
|
|
210
|
|
|
return df.set_index("ID") |
|
211
|
|
|
|
|
212
|
|
|
|
|
213
|
|
|
__all__ = ["FlatGoTerm", "UniprotGoTerms", "GoTermsAtLevel"] |
|
214
|
|
|
|