1
|
|
|
import dataclasses |
|
|
|
|
2
|
|
|
import logging |
3
|
|
|
import os |
4
|
|
|
from dataclasses import dataclass |
5
|
|
|
from typing import Collection, Iterable, List, Mapping, Optional, Union |
6
|
|
|
from urllib import request |
7
|
|
|
|
8
|
|
|
import pandas as pd |
|
|
|
|
9
|
|
|
import regex |
|
|
|
|
10
|
|
|
import uniprot |
|
|
|
|
11
|
|
|
|
12
|
|
|
# uses https://github.com/tanghaibao/goatools |
13
|
|
|
from goatools import obo_parser |
|
|
|
|
14
|
|
|
|
15
|
|
|
# NOT the same as FlatGoTerm, which has no knowledge of hierarchy |
16
|
|
|
from goatools.obo_parser import GOTerm |
|
|
|
|
17
|
|
|
|
18
|
|
|
from pocketutils.core.exceptions import MultipleMatchesError, StringPatternError |
|
|
|
|
19
|
|
|
|
20
|
|
|
# noinspection PyProtectedMember |
21
|
|
|
from pocketutils.core.input_output import silenced |
|
|
|
|
22
|
|
|
|
23
|
|
|
go_pattern = regex.compile( |
24
|
|
|
r"GO:(\d+); ([CFP]):([\dA-Za-z- ,()]+); ([A-Z]+):([A-Za-z-_]+)\.", flags=regex.V1 |
25
|
|
|
) |
26
|
|
|
GO_OBO_URL = "http://current.geneontology.org/ontology/go.obo" # nosec |
27
|
|
|
GO_OBO_FILENAME = "go.obo" |
28
|
|
|
logger = logging.getLogger("pocketutils") |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
@dataclass(frozen=True, repr=True) |
32
|
|
|
class FlatGoTerm: |
33
|
|
|
""" |
34
|
|
|
A Gene Ontology term. |
35
|
|
|
Not to be confused with GOTerm in goatools: obo_parser.GOTerm |
36
|
|
|
|
37
|
|
|
Attributes: |
38
|
|
|
- identifier: (str); ex: GO:0005737 |
39
|
|
|
- kind: (str: 'P'==process, 'C'==component, 'F'==function) |
40
|
|
|
- description: (str) |
41
|
|
|
- sourceId: (str); ex: IDA |
42
|
|
|
- sourceName: (str); ex: UniProtKB |
43
|
|
|
""" |
44
|
|
|
|
45
|
|
|
identifier: str |
46
|
|
|
kind: str |
47
|
|
|
description: str |
48
|
|
|
source_id: str |
49
|
|
|
source_name: str |
50
|
|
|
|
51
|
|
|
@classmethod |
52
|
|
|
def parse(cls, stwing: str): |
53
|
|
|
""" |
54
|
|
|
Builds a GO term from a string from uniprot_obj['go']. |
55
|
|
|
Raises: |
56
|
|
|
ValueError: if the syntax is wrong. |
57
|
|
|
""" |
58
|
|
|
match = go_pattern.search(stwing) |
59
|
|
|
if match is None: |
60
|
|
|
raise StringPatternError( |
61
|
|
|
f"String didn't match GO term pattern: {stwing}", |
62
|
|
|
value=stwing, |
63
|
|
|
pattern=go_pattern, |
64
|
|
|
) |
65
|
|
|
return FlatGoTerm( |
66
|
|
|
"GO:" + match.group(1), |
67
|
|
|
match.group(2), |
68
|
|
|
match.group(3), |
69
|
|
|
match.group(4), |
70
|
|
|
match.group(5), |
71
|
|
|
) |
72
|
|
|
|
73
|
|
|
def to_series(self) -> pd.Series: |
|
|
|
|
74
|
|
|
return pd.Series(dataclasses.asdict(self)) |
75
|
|
|
|
76
|
|
|
|
77
|
|
|
class UniprotGoTerms: |
|
|
|
|
78
|
|
|
def fetch_uniprot_data(self, uniprot_ids: Union[str, List[str]]) -> List[Mapping[str, str]]: |
|
|
|
|
79
|
|
|
""" |
80
|
|
|
Fetches a list of dicts of UniProt metadata, one per UniProt ID. |
81
|
|
|
|
82
|
|
|
Raises: |
83
|
|
|
ValueError: If a UniProt ID wasn't found. |
84
|
|
|
""" |
85
|
|
|
if isinstance(uniprot_ids, str): # not a list type |
86
|
|
|
uniprot_ids = [uniprot_ids] |
87
|
|
|
# if we don't prevent these here, we'll get a ValueError from below, which is confusing |
88
|
|
|
# That's because uniprot.fetch_uniprot_metadata will only return one per unique ID |
89
|
|
|
if len(set(uniprot_ids)) != len(uniprot_ids): |
90
|
|
|
raise MultipleMatchesError("Set of UniProt IDs cannot contain duplicates") |
91
|
|
|
with silenced(no_stderr=False): |
92
|
|
|
uniprot_data = uniprot.fetch_uniprot_metadata(uniprot_ids) |
93
|
|
|
if uniprot_data is None or uniprot_data == {} or len(uniprot_data) != len(uniprot_ids): |
94
|
|
|
raise LookupError(f"At least one UniProt ID not found in {uniprot_ids}") |
95
|
|
|
return list(uniprot_data.values()) |
96
|
|
|
|
97
|
|
|
def go_terms_for_uniprot_id(self, uniprot_id: str) -> List[FlatGoTerm]: |
98
|
|
|
"""Returns a list of FlatGoTerm objects from a UniProt ID.""" |
99
|
|
|
term_strings = (self.fetch_uniprot_data(uniprot_id)[0])["go"] |
100
|
|
|
return [FlatGoTerm(s) for s in term_strings] |
101
|
|
|
|
102
|
|
|
def go_terms_for_uniprot_id_as_df(self, uniprot_id: str) -> pd.DataFrame: |
103
|
|
|
"""Returns a Pandas DataFrame of GO terms from a UniProt ID.""" |
104
|
|
|
df = pd.DataFrame(columns=["ID", "kind", "description", "sourceId", "sourceName"]) |
|
|
|
|
105
|
|
|
for term in self.go_terms_for_uniprot_id(uniprot_id): |
106
|
|
|
df.loc[len(df)] = term.to_series() |
|
|
|
|
107
|
|
|
return df.set_index("ID") |
108
|
|
|
|
109
|
|
|
|
110
|
|
|
class GoTermsAtLevel: |
111
|
|
|
""" |
112
|
|
|
Gene ontology terms organized by level. |
113
|
|
|
|
114
|
|
|
Example: |
115
|
|
|
.. code-block:: |
116
|
|
|
|
117
|
|
|
go_term_ancestors_for_uniprot_id_as_df('P42681', 2) |
118
|
|
|
""" |
119
|
|
|
|
120
|
|
|
def __init__(self) -> None: |
121
|
|
|
if os.path.exists(GO_OBO_FILENAME): |
122
|
|
|
self.obo = obo_parser.GODag(GO_OBO_FILENAME) |
123
|
|
|
else: |
124
|
|
|
logger.info("Downloading Gene Ontology OBO...") |
125
|
|
|
request.urlretrieve(GO_OBO_URL) # nosec |
126
|
|
|
# This will be used in query_obo_term |
127
|
|
|
self.obo = obo_parser.GODag(GO_OBO_FILENAME) |
128
|
|
|
logger.info("Done downloading OBO.") |
129
|
|
|
self.substruct = UniprotGoTerms() |
130
|
|
|
|
131
|
|
|
def query_obo_term(self, term_id: str) -> GOTerm: |
132
|
|
|
""" |
133
|
|
|
Queries a term through the global obo. |
134
|
|
|
This function wraps the call to raise a ValueError if the term is not found; |
135
|
|
|
otherwise it only logs a warning. |
136
|
|
|
""" |
137
|
|
|
x = self.obo.query_term(term_id) |
|
|
|
|
138
|
|
|
if x is None: |
139
|
|
|
raise LookupError(f"Term ID {x} not found") |
140
|
|
|
return x |
141
|
|
|
|
142
|
|
|
def get_ancestors_of_go_term(self, term_id: str, level: int) -> Iterable[GOTerm]: |
143
|
|
|
""" |
144
|
|
|
From a GO term in the form 'GO:0007344', returns a set of ancestor GOTerm objects at the specified level. |
|
|
|
|
145
|
|
|
The traversal is restricted to is-a relationships. |
146
|
|
|
Note that the level is the minimum number of steps to the root. |
147
|
|
|
|
148
|
|
|
Args: |
149
|
|
|
term_id: The term |
150
|
|
|
level: starting at 0 (root) |
151
|
|
|
""" |
152
|
|
|
|
153
|
|
|
def traverse_up(term, buildup_set, lvl): |
154
|
|
|
if term.level == lvl: |
155
|
|
|
buildup_set.add(term) |
156
|
|
|
if term.has_parent: |
157
|
|
|
return [traverse_up(p, buildup_set, lvl) for p in term.parents] |
158
|
|
|
return None |
159
|
|
|
|
160
|
|
|
terms = set() |
161
|
|
|
traverse_up(self.query_obo_term(term_id), terms, level) |
162
|
|
|
return terms |
163
|
|
|
|
164
|
|
|
def go_term_ancestors_for_uniprot_id( |
165
|
|
|
self, uniprot_id: str, level: int, kinds_allowed: Optional[Collection[str]] = None |
|
|
|
|
166
|
|
|
) -> Iterable[GOTerm]: |
167
|
|
|
""" |
168
|
|
|
Gets the GO terms associated with a UniProt ID and returns a set of their ancestors at the specified level. |
|
|
|
|
169
|
|
|
The traversal is restricted to is-a relationships. |
170
|
|
|
Note that the level is the minimum number of steps to the root. |
171
|
|
|
|
172
|
|
|
Args: |
173
|
|
|
level: starting at 0 (root) |
174
|
|
|
uniprot_id: ID |
175
|
|
|
kinds_allowed: a set containing any combination of 'P', 'F', or 'C' |
176
|
|
|
""" |
177
|
|
|
if kinds_allowed is None: |
178
|
|
|
kinds_allowed = ["P", "F", "C"] |
179
|
|
|
if len(kinds_allowed) == 0: |
180
|
|
|
return [] |
181
|
|
|
terms = [ |
182
|
|
|
term |
183
|
|
|
for term in self.substruct.go_terms_for_uniprot_id(uniprot_id) |
184
|
|
|
if term.kind in kinds_allowed |
185
|
|
|
] |
186
|
|
|
ancestor_terms = set() |
187
|
|
|
for term_id in [t.identifier for t in terms]: |
188
|
|
|
ancestor_terms.update(self.get_ancestors_of_go_term(term_id, level)) |
189
|
|
|
return ancestor_terms |
190
|
|
|
|
191
|
|
|
def go_term_ancestors_for_uniprot_id_as_df( |
192
|
|
|
self, uniprot_id: str, level: int, kinds_allowed: Optional[Collection[str]] = None |
|
|
|
|
193
|
|
|
) -> pd.DataFrame: |
194
|
|
|
""" |
195
|
|
|
See go_term_ancestors_for_uniprot_id. |
196
|
|
|
|
197
|
|
|
Args: |
198
|
|
|
uniprot_id: ID |
199
|
|
|
level: Level |
200
|
|
|
kinds_allowed: Can include 'P', 'F', and/or 'C' |
201
|
|
|
|
202
|
|
|
Returns: |
203
|
|
|
Pandas DataFrame with columns IDand name. |
204
|
|
|
""" |
205
|
|
|
if kinds_allowed is None: |
206
|
|
|
kinds_allowed = ["P", "F", "C"] |
207
|
|
|
df = pd.DataFrame(columns=["ID", "name"]) |
|
|
|
|
208
|
|
|
for term in self.go_term_ancestors_for_uniprot_id(uniprot_id, level, kinds_allowed): |
209
|
|
|
df.loc[len(df)] = pd.Series({"ID": term.id, "name": term.name, "level": term.level}) |
|
|
|
|
210
|
|
|
return df.set_index("ID") |
211
|
|
|
|
212
|
|
|
|
213
|
|
|
__all__ = ["FlatGoTerm", "UniprotGoTerms", "GoTermsAtLevel"] |
214
|
|
|
|