InitFields._init_kwelems()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
dl 0
loc 11
rs 9.85
c 1
b 0
f 0
1
"""Manages a user-specified subset of a GO DAG."""
2
3
from __future__ import print_function
4
5
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
6
__author__ = "DV Klopfenstein"
7
8
import sys
9
import collections as cx
10
import math
11
from goatools.godag.relationship_str import RelationshipStr
12
from goatools.godag.go_tasks import CurNHigher
13
from goatools.gosubdag.godag_rcnt import CountRelatives
14
from goatools.gosubdag.go_tasks import get_leaf_children
15
from goatools.gosubdag.utils import get_kwargs
16
17
18
# pylint: disable=too-few-public-methods
19
class InitGOs(object):
20
    """Initialize GoSubDab."""
21
22
    # Add additional GO IDs if used in user tasks
23
    kws_aux_gos = set(['go2color'])
24
25
    def __init__(self, go_sources, go2obj, relationships=False, **kws):
26
        # kws: go2color, children
27
        self.kws = kws
28
        # Process: rcntobj tcntobj go2nt relationships
29
        self.go2obj_orig = go2obj
30
        if relationships:
31
            assert hasattr(next(iter(go2obj.values())), 'relationship'), "NO DAG RELATIONSHIPS"
32
        # Init go2obj and go_sources
33
        self.go2obj = None
34
        self.go_sources = None
35
        self._init_gos(go_sources, relationships)
36
        # Using reduced go2obj, init relationships
37
        self.relationships = self._init_relationships(relationships)  # set of relationship types
38
39
    def _init_relationships(self, relationships_arg):
40
        """Return a set of relationships found in all subset GO Terms."""
41
        if relationships_arg:
42
            relationships_all = self._get_all_relationships()
43
            if relationships_arg is True:
44
                return relationships_all
45
            else:
46
                return relationships_all.intersection(relationships_arg)
47
        return set()
48
49
    def _get_all_relationships(self):
50
        """Return all relationships seen in GO Dag subset."""
51
        relationships_all = set()
52
        for goterm in self.go2obj.values():
53
            if goterm.relationship:
54
                relationships_all.update(goterm.relationship)
55
            if goterm.relationship_rev:
56
                relationships_all.update(goterm.relationship_rev)
57
        return relationships_all
58
59
    def _init_gos(self, go_sources_arg, relationships_arg):
60
        """Initialize GO sources."""
61
        # No GO sources provided
62
        if not go_sources_arg:
63
            assert self.go2obj_orig, "go2obj MUST BE PRESENT IF go_sources IS NOT"
64
            self.go_sources = set(self.go2obj_orig)
65
            self.go2obj = self.go2obj_orig
66
            sys.stdout.write("**NOTE: {N:,} SOURCE GO IDS\n".format(N=len(self.go_sources)))
67
            return
68
        # GO sources provided
69
        go_sources = self._init_go_sources(go_sources_arg, self.go2obj_orig)
70
        # Create new go2obj_user subset matching GO sources
71
        # Fill with source and parent GO IDs and alternate GO IDs
72
        go2obj_user = {}
73
        objrel = CurNHigher(relationships_arg, self.go2obj_orig)
74
        objrel.get_go2obj_cur_n_high(go2obj_user, go_sources)
75
        # Add additional GOTerm information, if needed for user task
76
        kws_gos = {k:v for k, v in self.kws.items() if k in self.kws_aux_gos}
77
        if kws_gos:
78
            self._add_goterms_kws(go2obj_user, kws_gos)
79
        self.go_sources = go_sources
80
        self.go2obj = go2obj_user
81
82
    def _add_goterms_kws(self, go2obj_user, kws_gos):
83
        """Add more GOTerms to go2obj_user, if requested and relevant."""
84
        if 'go2color' in kws_gos:
85
            for goid in kws_gos['go2color'].keys():
86
                self._add_goterms(go2obj_user, goid)
87
88
    def _add_goterms(self, go2obj_user, goid):
89
        """Add alt GO IDs to go2obj subset, if requested and relevant."""
90
        goterm = self.go2obj_orig[goid]
91
        if goid != goterm.id and goterm.id in go2obj_user and goid not in go2obj_user:
92
            go2obj_user[goid] = goterm
93
94
    def _init_go_sources(self, go_sources_arg, go2obj_arg):
95
        """Return GO sources which are present in GODag."""
96
        gos_user = set(go_sources_arg)
97
        if 'children' in self.kws and self.kws['children']:
98
            gos_user |= get_leaf_children(gos_user, go2obj_arg)
99
        gos_godag = set(go2obj_arg)
100
        gos_source = gos_user.intersection(gos_godag)
101
        gos_missing = gos_user.difference(gos_godag)
102
        if not gos_missing:
103
            return gos_source
104
        sys.stdout.write("{N} GO IDs NOT FOUND IN GO DAG: {GOs}\n".format(
105
            N=len(gos_missing), GOs=" ".join([str(e) for e in gos_missing])))
106
        return gos_source
107
108
109
class InitFields(object):
110
    """Initialize print attributes and namedtuple fields."""
111
112
    exp_keys = set(['rcntobj', 'tcntobj', 'go2nt', 'go2letter'])
113
114
    def __init__(self, ini_main, **kws):
115
        self.go2obj = ini_main.go2obj
116
        self.kws = get_kwargs(kws, self.exp_keys, None)
117
        if 'rcntobj' not in kws:
118
            self.kws['rcntobj'] = True
119
        self.kw_elems = self._init_kwelems()
120
        self.relationships = ini_main.relationships
121
        self.prt_flds = self._init_prt_flds()
122
123
    def get_rcntobj(self):
124
        """Return None or user-provided CountRelatives object."""
125
        # rcntobj value in kws can be: None, False, True, CountRelatives object
126
        if 'rcntobj' in self.kws:
127
            rcntobj = self.kws['rcntobj']
128
            if isinstance(rcntobj, CountRelatives):
129
                return rcntobj
130
            return CountRelatives(
131
                self.go2obj,  # Subset go2obj contains only items needed by go_sources
132
                self.relationships,
133
                dcnt='dcnt' in self.kw_elems,
134
                go2letter=self.kws.get('go2letter'))
135
136
    def get_go2nt_all(self, rcntobj):
137
        """For each GO id, put all printable fields in one namedtuple."""
138
        if 'go2nt' in self.kws:
139
            go2nt = self.kws['go2nt']
140
            return {go:go2nt[go] for go in self.go2obj}
141
        else:
142
            return self._get_go2nt_all(rcntobj)
143
144
    def _init_prt_flds(self):
145
        """Return the print fields in the go2nt namedtuple."""
146
        # Create namedtuple fields or copy namedtuple fields
147
        if 'go2nt' not in self.kws:
148
            return self.__init_prt_flds()
149
        else:
150
            return next(iter(self.kws['go2nt'].values()))._asdict()
151
152
    def __init_prt_flds(self):
153
        """Return the print fields in the go2nt namedtuple."""
154
        prt_flds = ['NS', 'level', 'depth']
155
        if self.relationships:
156
            prt_flds.append('reldepth')
157
        prt_flds.extend(['GO', 'alt', 'GO_name'])
158
        if 'dcnt' in self.kw_elems:
159
            prt_flds.append('dcnt')
160
        if 'D1' in self.kw_elems:
161
            prt_flds.append('D1')
162
        if 'tcnt' in self.kw_elems:
163
            prt_flds.append('tcnt')
164
            prt_flds.append('tfreq')
165
            prt_flds.append('tinfo')
166
        if self.relationships:
167
            prt_flds.append('childcnt')
168
            prt_flds.append('REL')
169
            prt_flds.append('REL_short')
170
            prt_flds.append('rel')
171
        prt_flds.append('id')
172
        return prt_flds
173
174
    def get_prt_fmt(self, alt=False):
175
        """Return the format for printing GO named tuples and their related information."""
176
        # prt_fmt = [ #                                                        rcnt
177
        #     '{GO} # {NS}  L{level:02} D{depth:02} {GO_name}',
178
        #     '{GO} # {NS} {dcnt:6,} L{level:02} D{depth:02} {D1:5} {GO_name}']
179
        prt_fmt = []
180
        if alt:
181
            prt_fmt.append('{GO}{alt:1}')
182
        else:
183
            prt_fmt.append('{GO}')
184
        prt_fmt.append('# {NS}')
185
        if 'dcnt' in self.prt_flds:
186
            prt_fmt.append('{dcnt:5}')
187
        if 'childcnt' in self.prt_flds:
188
            prt_fmt.append('{childcnt:3}')
189
        if 'tcnt' in self.prt_flds:
190
            prt_fmt.append("{tcnt:7,}")
191
        if 'tfreq' in self.prt_flds:
192
            prt_fmt.append("{tfreq:8.6f}")
193
        if 'tinfo' in self.prt_flds:
194
            prt_fmt.append("{tinfo:5.2f}")
195
        prt_fmt.append('L{level:02} D{depth:02}')
196
        if self.relationships:
197
            prt_fmt.append('R{reldepth:02}')
198
        if 'D1' in self.prt_flds:
199
            prt_fmt.append('{D1:5}')
200
        if 'REL' in self.prt_flds:
201
            prt_fmt.append('{REL}')
202
            prt_fmt.append('{rel}')
203
        prt_fmt.append('{GO_name}')
204
        return " ".join(prt_fmt)
205
206
    def _get_go2nt_all(self, rcntobj):
207
        """For each GO id, put all printable fields in one namedtuple."""
208
        ### tic = timeit.default_timer()
209
        go2nt = {}
210
        ntobj = cx.namedtuple("NtGo", " ".join(self.prt_flds))
211
        ### tic = _rpt_hms(tic, "GoSubDag: _Init::get_go2nt")
212
        tcntobj = self.kws['tcntobj'] if 'tcntobj' in self.kws else None
213
        b_tcnt = tcntobj is not None
214
        # b_rcnt = rcntobj is not None and rcntobj
215
        objrelstr = RelationshipStr(self.relationships)
216
        namespace2ns = objrelstr.consts.NAMESPACE2NS
217
        for goid, goobj in self.go2obj.items():
218
            ns_go = namespace2ns[goobj.namespace]
219
            fld2vals = {
220
                'NS' : ns_go,
221
                'level' : goobj.level,
222
                'depth' : goobj.depth,
223
                'GO' : goid,
224
                'alt' : '' if goid == goobj.id else 'a',
225
                'id' : goobj.id,
226
                'GO_name' : goobj.name}
227
            if 'dcnt' in self.kw_elems:
228
                fld2vals['dcnt'] = rcntobj.go2dcnt.get(goid)
229
            if 'D1' in self.kw_elems:
230
                fld2vals['D1'] = rcntobj.get_d1str(goobj)
231
            if b_tcnt:
232
                tcnt = tcntobj.gocnts[goid]
233
                num_ns = float(tcntobj.aspect_counts[goobj.namespace])
234
                tfreq = float(tcnt)/num_ns if num_ns != 0 else 0
235
                fld2vals['tcnt'] = tcnt
236
                fld2vals['tfreq'] = tfreq
237
                fld2vals['tinfo'] = -1.0 * math.log(tfreq) if tfreq else 0
238
            if self.relationships:
239
                fld2vals['childcnt'] = len(goobj.children)
240
                fld2vals['reldepth'] = goobj.reldepth
241
                fld2vals['REL'] = objrelstr.str_relationships(goobj)
242
                fld2vals['REL_short'] = objrelstr.str_rel_short(goobj)
243
                fld2vals['rel'] = objrelstr.str_relationships_rev(goobj)
244
            go2nt[goid] = ntobj(**fld2vals)
245
        ### tic = _rpt_hms(tic, "GoSubDag: _Init::get_go2nt")
246
        return go2nt
247
248
    def _init_kwelems(self):
249
        """Init set elements."""
250
        ret = set()
251
        if 'rcntobj' in self.kws:
252
            ret.add('dcnt')
253
            ret.add('D1')
254
        if 'tcntobj' in self.kws:
255
            ret.add('tcnt')
256
            ret.add('tfreq')
257
            ret.add('tinfo')
258
        return ret
259
260
261
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved.
262