Completed
Push — master ( 0f596f...821888 )
by
unknown
01:11
created

InitFields   B

Complexity

Total Complexity 39

Size/Duplication

Total Lines 140
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 140
rs 8.2857
wmc 39

8 Methods

Rating   Name   Duplication   Size   Complexity  
F _get_go2nt_all() 0 39 10
A get_rcntobj() 0 12 3
A get_go2nt_all() 0 7 3
A _init_kwelems() 0 11 3
B __init_prt_flds() 0 19 6
A __init__() 0 8 2
F get_prt_fmt() 0 25 10
A _init_prt_flds() 0 7 2
1
"""Manages a user-specified subset of a GO DAG."""
2
3
from __future__ import print_function
4
5
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
6
__author__ = "DV Klopfenstein"
7
8
import sys
9
import collections as cx
10
import math
11
from goatools.godag.relationship_str import RelationshipStr
12
from goatools.godag.go_tasks import CurNHigher
13
from goatools.gosubdag.godag_rcnt import CountRelatives
14
from goatools.gosubdag.go_tasks import get_leaf_children
15
from goatools.gosubdag.utils import get_kwargs
16
17
18
# pylint: disable=too-few-public-methods
19
class InitGOs(object):
20
    """Initialize GoSubDab."""
21
22
    # Add additional GO IDs if used in user tasks
23
    kws_aux_gos = set(['go2color'])
24
25
    def __init__(self, go_sources, go2obj, relationships=False, **kws):
26
        # kws: go2color, children
27
        self.kws = kws
28
        # Process: rcntobj tcntobj go2nt relationships
29
        self.go2obj_orig = go2obj
30
        if relationships:
31
            assert hasattr(next(iter(go2obj.values())), 'relationship'), "NO DAG RELATIONSHIPS"
32
        # Init go2obj and go_sources
33
        self.go2obj = None
34
        self.go_sources = None
35
        self._init_gos(go_sources, relationships)
36
        # Using reduced go2obj, init relationships
37
        self.relationships = self._init_relationships(relationships)  # set of relationship types
38
39
    def _init_relationships(self, relationships_arg):
40
        """Return a set of relationships found in all subset GO Terms."""
41
        if relationships_arg:
42
            relationships_all = self._get_all_relationships()
43
            if relationships_arg is True:
44
                return relationships_all
45
            else:
46
                return relationships_all.intersection(relationships_arg)
47
        return set([])
48
49
    def _get_all_relationships(self):
50
        """Return all relationships seen in GO Dag subset."""
51
        relationships_all = set()
52
        for goterm in self.go2obj.values():
53
            if goterm.relationship:
54
                relationships_all.update(goterm.relationship)
55
            if goterm.relationship_rev:
56
                relationships_all.update(goterm.relationship_rev)
57
        return relationships_all
58
59
    def _init_gos(self, go_sources_arg, relationships_arg):
60
        """Initialize GO sources."""
61
        # No GO sources provided
62
        if not go_sources_arg:
63
            assert self.go2obj_orig, "go2obj MUST BE PRESENT IF go_sources IS NOT"
64
            self.go_sources = set(self.go2obj_orig)
65
            self.go2obj = self.go2obj_orig
66
            sys.stdout.write("**NOTE: {N:,} SOURCE GO IDS\n".format(N=len(self.go_sources)))
67
            return
68
        # GO sources provided
69
        go_sources = self._init_go_sources(go_sources_arg, self.go2obj_orig)
70
        # Create new go2obj_user subset matching GO sources
71
        # Fill with source and parent GO IDs and alternate GO IDs
72
        go2obj_user = {}
73
        objrel = CurNHigher(relationships_arg, self.go2obj_orig)
74
        objrel.get_go2obj_cur_n_high(go2obj_user, go_sources)
75
        # Add additional GOTerm information, if needed for user task
76
        kws_gos = {k:v for k, v in self.kws.items() if k in self.kws_aux_gos}
77
        if kws_gos:
78
            self._add_goterms_kws(go2obj_user, kws_gos)
79
        self.go_sources = go_sources
80
        self.go2obj = go2obj_user
81
82
    def _add_goterms_kws(self, go2obj_user, kws_gos):
83
        """Add more GOTerms to go2obj_user, if requested and relevant."""
84
        if 'go2color' in kws_gos:
85
            for goid in kws_gos['go2color'].keys():
86
                self._add_goterms(go2obj_user, goid)
87
88
    def _add_goterms(self, go2obj_user, goid):
89
        """Add alt GO IDs to go2obj subset, if requested and relevant."""
90
        goterm = self.go2obj_orig[goid]
91
        if goid != goterm.id and goterm.id in go2obj_user and goid not in go2obj_user:
92
            go2obj_user[goid] = goterm
93
94
    def _init_go_sources(self, go_sources_arg, go2obj_arg):
95
        """Return GO sources which are present in GODag."""
96
        gos_user = set(go_sources_arg)
97
        if 'children' in self.kws and self.kws['children']:
98
            gos_user |= get_leaf_children(gos_user, go2obj_arg)
99
        gos_godag = set(go2obj_arg)
100
        gos_source = gos_user.intersection(gos_godag)
101
        gos_missing = gos_user.difference(gos_godag)
102
        if not gos_missing:
103
            return gos_source
104
        sys.stdout.write("{N} GO IDs NOT FOUND IN GO DAG: {GOs}\n".format(
105
            N=len(gos_missing), GOs=" ".join([str(e) for e in gos_missing])))
106
        return gos_source
107
108
109
class InitFields(object):
110
    """Initialize print attributes and namedtuple fields."""
111
112
    exp_keys = set(['rcntobj', 'tcntobj', 'go2nt', 'go2letter'])
113
114
    def __init__(self, ini_main, **kws):
115
        self.go2obj = ini_main.go2obj
116
        self.kws = get_kwargs(kws, self.exp_keys, None)
117
        if 'rcntobj' not in kws:
118
            self.kws['rcntobj'] = True
119
        self.kw_elems = self._init_kwelems()
120
        self.relationships = ini_main.relationships
121
        self.prt_flds = self._init_prt_flds()
122
123
    def get_rcntobj(self):
124
        """Return None or user-provided CountRelatives object."""
125
        # rcntobj value in kws can be: None, False, True, CountRelatives object
126
        if 'rcntobj' in self.kws:
127
            rcntobj = self.kws['rcntobj']
128
            if isinstance(rcntobj, CountRelatives):
129
                return rcntobj
130
            return CountRelatives(
131
                self.go2obj,  # Subset go2obj contains only items needed by go_sources
132
                self.relationships,
133
                dcnt='dcnt' in self.kw_elems,
134
                go2letter=self.kws.get('go2letter'))
135
136
    def get_go2nt_all(self, rcntobj):
137
        """For each GO id, put all printable fields in one namedtuple."""
138
        if 'go2nt' in self.kws:
139
            go2nt = self.kws['go2nt']
140
            return {go:go2nt[go] for go in self.go2obj}
141
        else:
142
            return self._get_go2nt_all(rcntobj)
143
144
    def _init_prt_flds(self):
145
        """Return the print fields in the go2nt namedtuple."""
146
        # Create namedtuple fields or copy namedtuple fields
147
        if 'go2nt' not in self.kws:
148
            return self.__init_prt_flds()
149
        else:
150
            return next(iter(self.kws['go2nt'].values()))._asdict()
151
152
    def __init_prt_flds(self):
153
        """Return the print fields in the go2nt namedtuple."""
154
        prt_flds = ['NS', 'level', 'depth']
155
        if self.relationships:
156
            prt_flds.append('reldepth')
157
        prt_flds.extend(['GO', 'alt', 'GO_name'])
158
        if 'dcnt' in self.kw_elems: prt_flds.append('dcnt')
159
        if 'D1' in self.kw_elems: prt_flds.append('D1')
160
        if 'tcnt' in self.kw_elems:
161
            prt_flds.append('tcnt')
162
            prt_flds.append('tfreq')
163
            prt_flds.append('tinfo')
164
        if self.relationships:
165
            prt_flds.append('childcnt')
166
            prt_flds.append('REL')
167
            prt_flds.append('REL_short')
168
            prt_flds.append('rel')
169
        prt_flds.append('id')
170
        return prt_flds
171
172
    def get_prt_fmt(self, alt=False):
173
        """Return the format for printing GO named tuples and their related information."""
174
        # prt_fmt = [ #                                                        rcnt
175
        #     '{GO} # {NS}  L{level:02} D{depth:02} {GO_name}',
176
        #     '{GO} # {NS} {dcnt:6,} L{level:02} D{depth:02} {D1:5} {GO_name}']
177
        prt_fmt = []
178
        if alt:
179
            prt_fmt.append('{GO}{alt:1}')
180
        else:
181
            prt_fmt.append('{GO}')
182
        prt_fmt.append('# {NS}')
183
        if 'dcnt' in self.prt_flds: prt_fmt.append('{dcnt:5}')
184
        if 'childcnt' in self.prt_flds: prt_fmt.append('{childcnt:3}')
185
        if 'tcnt' in self.prt_flds: prt_fmt.append("{tcnt:7,}")
186
        if 'tfreq' in self.prt_flds: prt_fmt.append("{tfreq:8.6f}")
187
        if 'tinfo' in self.prt_flds: prt_fmt.append("{tinfo:5.2f}")
188
        prt_fmt.append('L{level:02} D{depth:02}')
189
        if self.relationships:
190
            prt_fmt.append('R{reldepth:02}')
191
        if 'D1' in self.prt_flds: prt_fmt.append('{D1:5}')
192
        if 'REL' in self.prt_flds:
193
            prt_fmt.append('{REL}')
194
            prt_fmt.append('{rel}')
195
        prt_fmt.append('{GO_name}')
196
        return " ".join(prt_fmt)
197
198
    def _get_go2nt_all(self, rcntobj):
199
        """For each GO id, put all printable fields in one namedtuple."""
200
        ### tic = timeit.default_timer()
201
        go2nt = {}
202
        ntobj = cx.namedtuple("NtGo", " ".join(self.prt_flds))
203
        ### tic = _rpt_hms(tic, "GoSubDag: _Init::get_go2nt")
204
        tcntobj = self.kws['tcntobj'] if 'tcntobj' in self.kws else None
205
        b_tcnt = tcntobj is not None
206
        # b_rcnt = rcntobj is not None and rcntobj
207
        objrelstr = RelationshipStr(self.relationships)
208
        namespace2ns = objrelstr.consts.NAMESPACE2NS
209
        for goid, goobj in self.go2obj.items():
210
            ns_go = namespace2ns[goobj.namespace]
211
            fld2vals = {
212
                'NS' : ns_go,
213
                'level' : goobj.level,
214
                'depth' : goobj.depth,
215
                'GO' : goid,
216
                'alt' : '' if goid == goobj.id else 'a',
217
                'id' : goobj.id,
218
                'GO_name' : goobj.name}
219
            if 'dcnt' in self.kw_elems: fld2vals['dcnt'] = rcntobj.go2dcnt.get(goid)
220
            if 'D1' in self.kw_elems: fld2vals['D1'] = rcntobj.get_d1str(goobj)
221
            if b_tcnt:
222
                tcnt = tcntobj.gocnts[goid]
223
                num_ns = float(tcntobj.aspect_counts[goobj.namespace])
224
                tfreq = float(tcnt)/num_ns if num_ns != 0 else 0
225
                fld2vals['tcnt'] = tcnt
226
                fld2vals['tfreq'] = tfreq
227
                fld2vals['tinfo'] = -1.0 * math.log(tfreq) if tfreq else 0
228
            if self.relationships:
229
                fld2vals['childcnt'] = len(goobj.children)
230
                fld2vals['reldepth'] = goobj.reldepth
231
                fld2vals['REL'] = objrelstr.str_relationships(goobj)
232
                fld2vals['REL_short'] = objrelstr.str_rel_short(goobj)
233
                fld2vals['rel'] = objrelstr.str_relationships_rev(goobj)
234
            go2nt[goid] = ntobj(**fld2vals)
235
        ### tic = _rpt_hms(tic, "GoSubDag: _Init::get_go2nt")
236
        return go2nt
237
238
    def _init_kwelems(self):
239
        """Init set elements."""
240
        ret = set()
241
        if 'rcntobj' in self.kws:
242
            ret.add('dcnt')
243
            ret.add('D1')
244
        if 'tcntobj' in self.kws:
245
            ret.add('tcnt')
246
            ret.add('tfreq')
247
            ret.add('tinfo')
248
        return ret
249
250
251
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved.
252