|
1
|
|
|
"""Manages a user-specified subset of a GO DAG.""" |
|
2
|
|
|
|
|
3
|
|
|
from __future__ import print_function |
|
4
|
|
|
|
|
5
|
|
|
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved." |
|
6
|
|
|
__author__ = "DV Klopfenstein" |
|
7
|
|
|
|
|
8
|
|
|
import sys |
|
9
|
|
|
import collections as cx |
|
10
|
|
|
# import timeit |
|
11
|
|
|
# from goatools.test_data.godag_timed import prt_hms |
|
12
|
|
|
from goatools.gosubdag.gosubdag_init import InitGOs |
|
13
|
|
|
from goatools.gosubdag.gosubdag_init import InitFields |
|
14
|
|
|
from goatools.gosubdag.go_tasks import chk_goids |
|
15
|
|
|
|
|
16
|
|
|
|
|
17
|
|
|
class GoSubDag(object): |
|
18
|
|
|
"""Manages a user-specified subset of a GO DAG.""" |
|
19
|
|
|
|
|
20
|
|
|
|
|
21
|
|
|
def __init__(self, go_sources, go2obj, relationships=None, **kws): |
|
22
|
|
|
# kws _Init: rcntobj relationships |
|
23
|
|
|
# tic = timeit.default_timer() |
|
24
|
|
|
_ini = InitGOs(go_sources, go2obj, relationships, **kws) |
|
25
|
|
|
self.go_sources = _ini.go_sources # set(go_sources) |
|
26
|
|
|
self.go2obj = _ini.go2obj # go2obj # Initialized with goobjs corresponding to go_sources |
|
27
|
|
|
self.relationships = _ini.relationships |
|
28
|
|
|
# tic = prt_hms(tic, "GoSubDag: InitGOs") |
|
29
|
|
|
# GO IDs to total count of all descendants: Init to None or CountRelatives object |
|
30
|
|
|
_fld = InitFields(_ini, **kws) |
|
31
|
|
|
self.rcntobj = _fld.get_rcntobj() # None or CountRelatives object |
|
32
|
|
|
self.prt_attr = { |
|
33
|
|
|
'flds':_fld.prt_flds, # namedtuple fields in go2nt |
|
34
|
|
|
'fmt':_fld.get_prt_fmt(False), # GO:NNNNNNN No indication if an alternate GO ID |
|
35
|
|
|
'fmta':_fld.get_prt_fmt(True)} # GO:NNNNNNNa 'a' indicates if an alternate GO ID |
|
36
|
|
|
### tic = _rpt_hms(tic, "GoSubDag: Create GoDepth1Letters") |
|
37
|
|
|
self.go2nt = _fld.get_go2nt_all(self.rcntobj) |
|
38
|
|
|
### tic = _rpt_hms(tic0, "GoSubDag: total") |
|
39
|
|
|
prt = kws.get('prt', None) |
|
40
|
|
|
if prt is not None: |
|
41
|
|
|
self.prt_objdesc(prt) |
|
42
|
|
|
|
|
43
|
|
|
def prt_goids(self, goids=None, prtfmt=None, sortby=True, prt=sys.stdout): |
|
44
|
|
|
"""Given GO IDs, print decriptive info about each GO Term.""" |
|
45
|
|
|
if goids is None: |
|
46
|
|
|
goids = self.go_sources |
|
47
|
|
|
nts = self.get_nts(goids, sortby) |
|
48
|
|
|
if prtfmt is None: |
|
49
|
|
|
prtfmt = self.prt_attr['fmta'] |
|
50
|
|
|
for ntgo in nts: |
|
51
|
|
|
key2val = ntgo._asdict() |
|
52
|
|
|
prt.write("{GO}\n".format(GO=prtfmt.format(**key2val))) |
|
53
|
|
|
return nts |
|
54
|
|
|
|
|
55
|
|
|
def get_nts(self, goids=None, sortby=None): |
|
56
|
|
|
"""Given GO IDs, get a list of namedtuples.""" |
|
57
|
|
|
nts = [] |
|
58
|
|
|
# User GO IDs |
|
59
|
|
|
if goids is None: |
|
60
|
|
|
goids = self.go_sources |
|
61
|
|
|
else: |
|
62
|
|
|
chk_goids(goids, "GoSubDag::get_nts") |
|
63
|
|
|
if goids: |
|
64
|
|
|
ntobj = cx.namedtuple("NtGo", " ".join(self.prt_attr['flds'])) |
|
65
|
|
|
go2nt = self.get_go2nt(goids) |
|
66
|
|
|
for goid, ntgo in self._get_sorted_go2nt(go2nt, sortby): |
|
67
|
|
|
assert ntgo is not None, "{GO} NOT IN go2nt".format(GO=goid) |
|
68
|
|
|
if goid == ntgo.GO: |
|
69
|
|
|
nts.append(ntgo) |
|
70
|
|
|
else: |
|
71
|
|
|
fld2vals = ntgo._asdict() |
|
72
|
|
|
fld2vals['GO'] = goid |
|
73
|
|
|
nts.append(ntobj(**fld2vals)) |
|
74
|
|
|
return nts |
|
75
|
|
|
|
|
76
|
|
|
def _get_sorted_go2nt(self, go2nt, sortby): |
|
77
|
|
|
"""Return sorted list of tuples.""" |
|
78
|
|
|
if sortby is True: |
|
79
|
|
|
_fnc = self.get_fncsortnt() |
|
80
|
|
|
return sorted(go2nt.items(), key=lambda t: _fnc(t[1])) |
|
81
|
|
|
if sortby: |
|
82
|
|
|
return sorted(go2nt.items(), key=lambda t: sortby(t[1])) |
|
83
|
|
|
return go2nt.items() |
|
84
|
|
|
|
|
85
|
|
|
def get_fncsortnt(self): |
|
86
|
|
|
"""Return sorted list of tuples.""" |
|
87
|
|
|
if 'dcnt' in self.prt_attr['flds']: |
|
88
|
|
|
if 'D1' in self.prt_attr['flds']: |
|
89
|
|
|
return lambda ntgo: [ntgo.NS, ntgo.depth, -1*ntgo.dcnt, ntgo.D1, ntgo.alt] |
|
90
|
|
|
else: |
|
91
|
|
|
return lambda ntgo: [ntgo.NS, ntgo.depth, -1*ntgo.dcnt, ntgo.alt] |
|
92
|
|
|
else: |
|
93
|
|
|
return lambda ntgo: [ntgo.NS, -1*ntgo.depth, ntgo.alt] |
|
94
|
|
|
|
|
95
|
|
|
def get_go2nt(self, goids): |
|
96
|
|
|
"""Return dict of GO ID as key and GO object information in namedtuple.""" |
|
97
|
|
|
get_nt = self.go2nt |
|
98
|
|
|
goids_present = set(goids).intersection(self.go2obj) |
|
99
|
|
|
if len(goids_present) != len(goids): |
|
100
|
|
|
print("GO IDs NOT FOUND IN DAG: {GOs}".format( |
|
101
|
|
|
GOs=" ".join(set(goids).difference(goids_present)))) |
|
102
|
|
|
return {g:get_nt[g] for g in goids_present} |
|
103
|
|
|
|
|
104
|
|
|
def get_go2obj(self, goids): |
|
105
|
|
|
"""Return a go2obj dict for just the user goids.""" |
|
106
|
|
|
go2obj = self.go2obj |
|
107
|
|
|
return {go:go2obj[go] for go in goids} |
|
108
|
|
|
|
|
109
|
|
|
def get_vals(self, field, goids=None): |
|
110
|
|
|
"""Return a go2obj dict for just the user goids.""" |
|
111
|
|
|
go2nt = self.go2nt |
|
112
|
|
|
if goids is None: |
|
113
|
|
|
goids = set(go2nt) |
|
114
|
|
|
return [getattr(go2nt[go], field) for go in goids] |
|
115
|
|
|
|
|
116
|
|
|
def get_key_goids(self, goids): |
|
117
|
|
|
"""Given GO IDs, return key GO IDs.""" |
|
118
|
|
|
go2obj = self.go2obj |
|
119
|
|
|
return set(go2obj[go].id for go in goids) |
|
120
|
|
|
|
|
121
|
|
|
def get_ns2goids(self, goids): |
|
122
|
|
|
"""Group GO IDs by namespace.""" |
|
123
|
|
|
ns2goids = cx.defaultdict(set) |
|
124
|
|
|
go2nt = self.go2nt |
|
125
|
|
|
for goid in goids: |
|
126
|
|
|
ns2goids[go2nt[goid].NS].add(goid) |
|
127
|
|
|
return {ns:gos for ns, gos in ns2goids.items()} |
|
128
|
|
|
|
|
129
|
|
|
def prt_objdesc(self, prt): |
|
130
|
|
|
"""Return description of this GoSubDag object.""" |
|
131
|
|
|
txt = "INITIALIZING GoSubDag: {N:3} sources in {M:3} GOs rcnt({R}). {A} alt GO IDs\n" |
|
132
|
|
|
alt2obj = {go:o for go, o in self.go2obj.items() if go != o.id} |
|
133
|
|
|
prt.write(txt.format( |
|
134
|
|
|
N=len(self.go_sources), |
|
135
|
|
|
M=len(self.go2obj), |
|
136
|
|
|
R=self.rcntobj is not None, |
|
137
|
|
|
A=len(alt2obj))) |
|
138
|
|
|
prt.write(" GoSubDag: namedtuple fields: {FLDS}\n".format( |
|
139
|
|
|
FLDS=" ".join(self.prt_attr['flds']))) |
|
140
|
|
|
prt.write(" GoSubDag: relationships: {RELS}\n".format(RELS=self.relationships)) |
|
141
|
|
|
|
|
142
|
|
|
|
|
143
|
|
|
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved. |
|
144
|
|
|
|