1
|
|
|
"""Manages a user-specified subset of a GO DAG.""" |
2
|
|
|
|
3
|
|
|
from __future__ import print_function |
4
|
|
|
|
5
|
|
|
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved." |
6
|
|
|
__author__ = "DV Klopfenstein" |
7
|
|
|
|
8
|
|
|
import sys |
9
|
|
|
import collections as cx |
10
|
|
|
# import timeit |
11
|
|
|
# from goatools.test_data.godag_timed import prt_hms |
12
|
|
|
from goatools.gosubdag.gosubdag_init import InitGOs |
13
|
|
|
from goatools.gosubdag.gosubdag_init import InitFields |
14
|
|
|
from goatools.gosubdag.go_tasks import chk_goids |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
class GoSubDag(object): |
18
|
|
|
"""Manages a user-specified subset of a GO DAG.""" |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
def __init__(self, go_sources, go2obj, relationships=None, **kws): |
22
|
|
|
# kws _Init: rcntobj relationships |
23
|
|
|
# tic = timeit.default_timer() |
24
|
|
|
_ini = InitGOs(go_sources, go2obj, relationships, **kws) |
25
|
|
|
self.go_sources = _ini.go_sources # set(go_sources) |
26
|
|
|
self.go2obj = _ini.go2obj # go2obj # Initialized with goobjs corresponding to go_sources |
27
|
|
|
self.relationships = _ini.relationships |
28
|
|
|
# tic = prt_hms(tic, "GoSubDag: InitGOs") |
29
|
|
|
# GO IDs to total count of all descendants: Init to None or CountRelatives object |
30
|
|
|
_fld = InitFields(_ini, **kws) |
31
|
|
|
self.rcntobj = _fld.get_rcntobj() # None or CountRelatives object |
32
|
|
|
self.prt_attr = { |
33
|
|
|
'flds':_fld.prt_flds, # namedtuple fields in go2nt |
34
|
|
|
'fmt':_fld.get_prt_fmt(False), # GO:NNNNNNN No indication if an alternate GO ID |
35
|
|
|
'fmta':_fld.get_prt_fmt(True)} # GO:NNNNNNNa 'a' indicates if an alternate GO ID |
36
|
|
|
### tic = _rpt_hms(tic, "GoSubDag: Create GoDepth1Letters") |
37
|
|
|
self.go2nt = _fld.get_go2nt_all(self.rcntobj) |
38
|
|
|
### tic = _rpt_hms(tic0, "GoSubDag: total") |
39
|
|
|
prt = kws.get('prt', None) |
40
|
|
|
if prt is not None: |
41
|
|
|
self.prt_objdesc(prt) |
42
|
|
|
|
43
|
|
|
def prt_goids(self, goids=None, prtfmt=None, sortby=True, prt=sys.stdout): |
44
|
|
|
"""Given GO IDs, print decriptive info about each GO Term.""" |
45
|
|
|
if goids is None: |
46
|
|
|
goids = self.go_sources |
47
|
|
|
nts = self.get_nts(goids, sortby) |
48
|
|
|
if prtfmt is None: |
49
|
|
|
prtfmt = self.prt_attr['fmta'] |
50
|
|
|
for ntgo in nts: |
51
|
|
|
key2val = ntgo._asdict() |
52
|
|
|
prt.write("{GO}\n".format(GO=prtfmt.format(**key2val))) |
53
|
|
|
return nts |
54
|
|
|
|
55
|
|
|
def get_nts(self, goids=None, sortby=None): |
56
|
|
|
"""Given GO IDs, get a list of namedtuples.""" |
57
|
|
|
nts = [] |
58
|
|
|
# User GO IDs |
59
|
|
|
if goids is None: |
60
|
|
|
goids = self.go_sources |
61
|
|
|
else: |
62
|
|
|
chk_goids(goids, "GoSubDag::get_nts") |
63
|
|
|
if goids: |
64
|
|
|
ntobj = cx.namedtuple("NtGo", " ".join(self.prt_attr['flds'])) |
65
|
|
|
go2nt = self.get_go2nt(goids) |
66
|
|
|
for goid, ntgo in self._get_sorted_go2nt(go2nt, sortby): |
67
|
|
|
assert ntgo is not None, "{GO} NOT IN go2nt".format(GO=goid) |
68
|
|
|
if goid == ntgo.GO: |
69
|
|
|
nts.append(ntgo) |
70
|
|
|
else: |
71
|
|
|
fld2vals = ntgo._asdict() |
72
|
|
|
fld2vals['GO'] = goid |
73
|
|
|
nts.append(ntobj(**fld2vals)) |
74
|
|
|
return nts |
75
|
|
|
|
76
|
|
|
def _get_sorted_go2nt(self, go2nt, sortby): |
77
|
|
|
"""Return sorted list of tuples.""" |
78
|
|
|
if sortby is True: |
79
|
|
|
_fnc = self.get_fncsortnt() |
80
|
|
|
return sorted(go2nt.items(), key=lambda t: _fnc(t[1])) |
81
|
|
|
if sortby: |
82
|
|
|
return sorted(go2nt.items(), key=lambda t: sortby(t[1])) |
83
|
|
|
return go2nt.items() |
84
|
|
|
|
85
|
|
|
def get_fncsortnt(self): |
86
|
|
|
"""Return sorted list of tuples.""" |
87
|
|
|
if 'dcnt' in self.prt_attr['flds']: |
88
|
|
|
if 'D1' in self.prt_attr['flds']: |
89
|
|
|
return lambda ntgo: [ntgo.NS, ntgo.depth, -1*ntgo.dcnt, ntgo.D1, ntgo.alt] |
90
|
|
|
else: |
91
|
|
|
return lambda ntgo: [ntgo.NS, ntgo.depth, -1*ntgo.dcnt, ntgo.alt] |
92
|
|
|
else: |
93
|
|
|
return lambda ntgo: [ntgo.NS, -1*ntgo.depth, ntgo.alt] |
94
|
|
|
|
95
|
|
|
def get_go2nt(self, goids): |
96
|
|
|
"""Return dict of GO ID as key and GO object information in namedtuple.""" |
97
|
|
|
get_nt = self.go2nt |
98
|
|
|
goids_present = set(goids).intersection(self.go2obj) |
99
|
|
|
if len(goids_present) != len(goids): |
100
|
|
|
print("GO IDs NOT FOUND IN DAG: {GOs}".format( |
101
|
|
|
GOs=" ".join(set(goids).difference(goids_present)))) |
102
|
|
|
return {g:get_nt[g] for g in goids_present} |
103
|
|
|
|
104
|
|
|
def get_go2obj(self, goids): |
105
|
|
|
"""Return a go2obj dict for just the user goids.""" |
106
|
|
|
go2obj = self.go2obj |
107
|
|
|
return {go:go2obj[go] for go in goids} |
108
|
|
|
|
109
|
|
|
def get_vals(self, field, goids=None): |
110
|
|
|
"""Return a go2obj dict for just the user goids.""" |
111
|
|
|
go2nt = self.go2nt |
112
|
|
|
if goids is None: |
113
|
|
|
goids = set(go2nt) |
114
|
|
|
return [getattr(go2nt[go], field) for go in goids] |
115
|
|
|
|
116
|
|
|
def get_key_goids(self, goids): |
117
|
|
|
"""Given GO IDs, return key GO IDs.""" |
118
|
|
|
go2obj = self.go2obj |
119
|
|
|
return set(go2obj[go].id for go in goids) |
120
|
|
|
|
121
|
|
|
def get_ns2goids(self, goids): |
122
|
|
|
"""Group GO IDs by namespace.""" |
123
|
|
|
ns2goids = cx.defaultdict(set) |
124
|
|
|
go2nt = self.go2nt |
125
|
|
|
for goid in goids: |
126
|
|
|
ns2goids[go2nt[goid].NS].add(goid) |
127
|
|
|
return {ns:gos for ns, gos in ns2goids.items()} |
128
|
|
|
|
129
|
|
|
def prt_objdesc(self, prt): |
130
|
|
|
"""Return description of this GoSubDag object.""" |
131
|
|
|
txt = "INITIALIZING GoSubDag: {N:3} sources in {M:3} GOs rcnt({R}). {A} alt GO IDs\n" |
132
|
|
|
alt2obj = {go:o for go, o in self.go2obj.items() if go != o.id} |
133
|
|
|
prt.write(txt.format( |
134
|
|
|
N=len(self.go_sources), |
135
|
|
|
M=len(self.go2obj), |
136
|
|
|
R=self.rcntobj is not None, |
137
|
|
|
A=len(alt2obj))) |
138
|
|
|
prt.write(" GoSubDag: namedtuple fields: {FLDS}\n".format( |
139
|
|
|
FLDS=" ".join(self.prt_attr['flds']))) |
140
|
|
|
prt.write(" GoSubDag: relationships: {RELS}\n".format(RELS=self.relationships)) |
141
|
|
|
|
142
|
|
|
|
143
|
|
|
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved. |
144
|
|
|
|