Grouper   F
last analyzed

Complexity

Total Complexity 70

Size/Duplication

Total Lines 258
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 258
rs 2.8
wmc 70

22 Methods

Rating   Name   Duplication   Size   Complexity  
A _get_depthsr() 0 5 2
A get_usrgos_g_hdrgos() 0 12 5
A get_section2items() 0 8 4
A get_hdrgos_g_usrgos() 0 13 4
A get_go2sectiontxt() 0 10 4
B get_usrgos_g_section() 0 17 6
B get_sections_2d() 0 25 8
A get_hdrgos_u0() 0 3 1
A get_usrgos_w_parents() 0 12 4
A get_usrgo2sections() 0 14 3
A get_section2usrnts() 0 7 3
A get_hdrgos() 0 3 1
A _str_replace() 0 10 1
A get_fout_base() 0 14 2
A prt_summary() 0 9 1
A get_sections_2d_nts() 0 7 2
B get_usrgo2hdrgo() 0 16 6
A get_hdrgos_u1() 0 3 1
B get_section_hdrgos_nts() 0 19 7
A __init__() 0 16 2
A get_hdrgo2usrgos() 0 5 2
A get_hdrgos_unplaced() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like Grouper often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Given user GO ids and parent terms, group user GO ids under one parent term.
2
3
   Given a group of GO ids with one or more higher-level grouping terms, group
4
   each user GO id under the most descriptive parent GO term.
5
6
   Each GO id may have more than one parent.  One of the parent(s) is chosen
7
   to best represent the user GO id's function. The choice of parent is made by
8
   regarding how close the parent GO id is to the bottom of its hierarchy.
9
10
   The estimation of how close a GO term is to "the bottom" of its GO hierarchy
11
   is estimated using the number of total Go term descendent counts below
12
   that term.
13
"""
14
15
import sys
16
import collections as cx
17
from goatools.godag.consts import Consts
18
from goatools.grouper.grprobj_init import GrouperInit
19
20
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
21
__author__ = "DV Klopfenstein"
22
23
24
class Grouper(object):
25
    """Groups the user GO ids under other GO IDs acting as headers for the GO groups."""
26
27
    fmtsum = ("{GO_DESC} GOs({GOs:6,} in {SECs:2} sections, "
28
              "{UNGRP:>3} {undesc}) {ACTION} {FILE}\n")
29
30
    def __init__(self, grpname, goids, hdrobj, gosubdag, **kws):
31
        # print("INITIALIZING Grouper")
32
        # Data members read
33
        self.grpname = grpname
34
        self.hdrobj = hdrobj  # Contains all possible hdrgos, not just ones used
35
        self.gosubdag = gosubdag
36
        assert self.gosubdag.rcntobj is not None
37
        # _ini = GrouperInit(grpname, goids, hdrobj, gosubdag, kws.get('fnc_most_specific', 'dcnt'))
38
        _ini = GrouperInit(goids, self, kws.get('fnc_most_specific', 'dcnt'))
39
        self.usrgos = _ini.usrgos
40
        # Initialize: hdrgo2usrgos hdrgo_is_usrgo
41
        #   * hdrgo2usrgos: User GO IDs, grouped under high GO IDs (grouped, but not sorted)
42
        self.hdrgo2usrgos = _ini.hdrgo2usrgos
43
        self.hdrgo_is_usrgo = _ini.hdrgo_is_usrgo  # set of GO IDs -> both headers/user GO IDs
44
        # User GO IDs and their corresponding high GO IDs (not grouped or sorted)
45
        self.go2nt = _ini.get_go2nt(kws.get('go2nt', None))
46
47
    def get_usrgos_w_parents(self, hdrgos, usrgos_all=None):
48
        """Get usrgos w/parents in hdrgos, even if usrgos did not get grouped under hdrgos."""
49
        usrgos = set()
50
        _go2parents = self.gosubdag.rcntobj.go2parents
51
        if usrgos_all is None:
52
            usrgos_all = self.usrgos
53
        for usrgo in usrgos_all:
54
            all_usrgo_parents = _go2parents.get(usrgo)
55
            sel_usrgo_parents = all_usrgo_parents.intersection(hdrgos)
56
            if sel_usrgo_parents:
57
                usrgos.add(usrgo)
58
        return usrgos
59
60
    def get_sections_2d(self):
61
        """Get 2-D list of sections and hdrgos sets actually used in grouping."""
62
        sections_hdrgos_act = []
63
        hdrgos_act_all = self.get_hdrgos()
64
        hdrgos_act_secs = set()
65
        if self.hdrobj.sections:
66
            for section_name, hdrgos_all_lst in self.hdrobj.sections:
67
                hdrgos_all_set = set(hdrgos_all_lst)
68
                hdrgos_act_set = hdrgos_all_set.intersection(hdrgos_act_all)
69
                if hdrgos_act_set:
70
                    hdrgos_act_secs |= hdrgos_act_set
71
                    # Use original order of header GOs found in sections
72
                    hdrgos_act_lst = []
73
                    hdrgos_act_ctr = cx.Counter()
74
                    for hdrgo_p in hdrgos_all_lst: # Header GO that may or may not be used.
75
                        if hdrgo_p in hdrgos_act_set and hdrgos_act_ctr[hdrgo_p] == 0:
76
                            hdrgos_act_lst.append(hdrgo_p)
77
                        hdrgos_act_ctr[hdrgo_p] += 1
78
                    sections_hdrgos_act.append((section_name, hdrgos_act_lst))
79
            hdrgos_act_rem = hdrgos_act_all.difference(hdrgos_act_secs)
80
            if hdrgos_act_rem:
81
                sections_hdrgos_act.append((self.hdrobj.secdflt, hdrgos_act_rem))
82
        else:
83
            sections_hdrgos_act.append((self.hdrobj.secdflt, hdrgos_act_all))
84
        return sections_hdrgos_act
85
86
    def get_usrgos_g_section(self, section=None):
87
        """Get usrgos in a requested section."""
88
        if section is None:
89
            section = self.hdrobj.secdflt
90
        if section is True:
91
            return self.usrgos
92
        # Get dict of sections and hdrgos actually used in grouping
93
        section2hdrgos = cx.OrderedDict(self.get_sections_2d())
94
        hdrgos_lst = section2hdrgos.get(section, None)
95
        if hdrgos_lst is not None:
96
            hdrgos_set = set(hdrgos_lst)
97
            hdrgos_u = hdrgos_set.intersection(self.hdrgo_is_usrgo)
98
            hdrgos_h = hdrgos_set.intersection(self.hdrgo2usrgos.keys())
99
            usrgos = set([u for h in hdrgos_h for u in self.hdrgo2usrgos.get(h)])
100
            usrgos |= hdrgos_u
101
            return usrgos
102
        return set()
103
104
    def get_section2usrnts(self):
105
        """Get dict section2usrnts."""
106
        sec_nts = []
107
        for section_name, _ in self.get_sections_2d():
108
            usrgos = self.get_usrgos_g_section(section_name)
109
            sec_nts.append((section_name, [self.go2nt.get(u) for u in usrgos]))
110
        return cx.OrderedDict(sec_nts)
111
112
    def get_section2items(self, itemkey):
113
        """Collect all items into a single set per section."""
114
        sec_items = []
115
        section2usrnts = self.get_section2usrnts()
116
        for section, usrnts in section2usrnts.items():
117
            items = set([e for nt in usrnts for e in getattr(nt, itemkey, set())])
118
            sec_items.append((section, items))
119
        return cx.OrderedDict(sec_items)
120
121
    def get_hdrgos_g_usrgos(self, usrgos):
122
        """Return hdrgos which contain the usrgos."""
123
        hdrgos_for_usrgos = set()
124
        hdrgos_all = self.get_hdrgos()
125
        usrgo2hdrgo = self.get_usrgo2hdrgo()
126
        for usrgo in usrgos:
127
            if usrgo in hdrgos_all:
128
                hdrgos_for_usrgos.add(usrgo)
129
                continue
130
            hdrgo_cur = usrgo2hdrgo.get(usrgo, None)
131
            if hdrgo_cur is not None:
132
                hdrgos_for_usrgos.add(hdrgo_cur)
133
        return hdrgos_for_usrgos
134
135
    def get_section_hdrgos_nts(self, sortby=None):
136
        """Get a flat list of sections and hdrgos actually used in grouping."""
137
        nts_all = []
138
        section_hdrgos_actual = self.get_sections_2d()
139
        flds_all = ['Section'] + self.gosubdag.prt_attr['flds']
140
        ntobj = cx.namedtuple("NtGoSec", " ".join(flds_all))
141
        flds_go = None
142
        if sortby is None:
143
            sortby = lambda nt: -1*nt.dcnt
144
        for section_name, hdrgos_actual in section_hdrgos_actual:
145
            nts_sec = []
146
            for hdrgo_nt in self.gosubdag.get_go2nt(hdrgos_actual).values():
147
                if flds_go is None:
148
                    flds_go = hdrgo_nt._fields
149
                key2val = {key:val for key, val in zip(flds_go, list(hdrgo_nt))}
150
                key2val['Section'] = section_name
151
                nts_sec.append(ntobj(**key2val))
152
            nts_all.extend(sorted(nts_sec, key=sortby))
153
        return nts_all
154
155
    def get_sections_2d_nts(self, sortby=None):
156
        """Get high GO IDs that are actually used to group current set of GO IDs."""
157
        sections_2d_nts = []
158
        for section_name, hdrgos_actual in self.get_sections_2d():
159
            hdrgo_nts = self.gosubdag.get_nts(hdrgos_actual, sortby=sortby)
160
            sections_2d_nts.append((section_name, hdrgo_nts))
161
        return sections_2d_nts
162
163
    def get_hdrgos(self):
164
        """Return high GO IDs that are actually used to group current set of GO IDs."""
165
        return set(self.hdrgo2usrgos.keys()).union(self.hdrgo_is_usrgo)
166
167
    def get_usrgos_g_hdrgos(self, hdrgos):
168
        """Return usrgos under provided hdrgos."""
169
        usrgos_all = set()
170
        if isinstance(hdrgos, str):
171
            hdrgos = [hdrgos]
172
        for hdrgo in hdrgos:
173
            usrgos_cur = self.hdrgo2usrgos.get(hdrgo, None)
174
            if usrgos_cur is not None:
175
                usrgos_all |= usrgos_cur
176
            if hdrgo in self.hdrgo_is_usrgo:
177
                usrgos_all.add(hdrgo)
178
        return usrgos_all
179
180
    def get_hdrgos_unplaced(self):
181
        """Get hdrgos which are not headers in sections."""
182
        return self.get_hdrgos().difference(self.hdrobj.get_section_hdrgos())
183
184
    def get_hdrgos_u0(self):
185
        """Return header GO IDs which ARE NOT user GO IDs."""
186
        return set(self.hdrgo2usrgos.keys()).difference(self.usrgos)
187
188
    def get_hdrgos_u1(self):
189
        """Return header GO IDs which ARE user GO IDs."""
190
        return self.hdrgo_is_usrgo
191
192
    def get_hdrgo2usrgos(self, hdrgos):
193
        """Return a subset of hdrgo2usrgos."""
194
        get_usrgos = self.hdrgo2usrgos.get
195
        hdrgos_actual = self.get_hdrgos().intersection(hdrgos)
196
        return {h:get_usrgos(h) for h in hdrgos_actual}
197
198
    def get_usrgo2hdrgo(self):
199
        """Return a dict with all user GO IDs as keys and their respective header GOs as values."""
200
        usrgo2hdrgo = {}
201
        for hdrgo, usrgos in self.hdrgo2usrgos.items():
202
            for usrgo in usrgos:
203
                assert usrgo not in usrgo2hdrgo
204
                usrgo2hdrgo[usrgo] = hdrgo
205
        # Add usrgos which are also a hdrgo and the GO group contains no other GO IDs
206
        for goid in self.hdrgo_is_usrgo:
207
            usrgo2hdrgo[goid] = goid
208
        assert len(self.usrgos) <= len(usrgo2hdrgo), \
209
            "USRGOS({U}) != USRGO2HDRGO({H}): {GOs}".format(
210
                U=len(self.usrgos),
211
                H=len(usrgo2hdrgo),
212
                GOs=self.usrgos.symmetric_difference(set(usrgo2hdrgo.keys())))
213
        return usrgo2hdrgo
214
215
    def get_go2sectiontxt(self):
216
        """Return a dict with actual header and user GO IDs as keys and their sections as values."""
217
        go2txt = {}
218
        _get_secs = self.hdrobj.get_sections
219
        hdrgo2sectxt = {h:" ".join(_get_secs(h)) for h in self.get_hdrgos()}
220
        usrgo2hdrgo = self.get_usrgo2hdrgo()
221
        for goid, ntgo in self.go2nt.items():
222
            hdrgo = ntgo.GO if ntgo.is_hdrgo else usrgo2hdrgo[ntgo.GO]
223
            go2txt[goid] = hdrgo2sectxt[hdrgo]
224
        return go2txt
225
226
    def get_usrgo2sections(self):
227
        """Return a dict with all user GO IDs as keys and their sections as values."""
228
        usrgo2sections = cx.defaultdict(set)
229
        usrgo2hdrgo = self.get_usrgo2hdrgo()
230
        get_sections = self.hdrobj.get_sections
231
        for usrgo, hdrgo in usrgo2hdrgo.items():
232
            sections = set(get_sections(hdrgo))
233
            usrgo2sections[usrgo] |= sections
234
        assert len(usrgo2sections) >= len(self.usrgos), \
235
            "uGOS({U}) != uGO2sections({H}): {GOs}".format(
236
                U=len(self.usrgos),
237
                H=len(usrgo2sections),
238
                GOs=self.usrgos.symmetric_difference(set(usrgo2sections.keys())))
239
        return usrgo2sections
240
241
    def get_fout_base(self, goid, name=None, pre="gogrp"):
242
        """Get filename for a group of GO IDs under a single header GO ID."""
243
        goobj = self.gosubdag.go2obj[goid]
244
        if name is None:
245
            name = self.grpname.replace(" ", "_")
246
        sections = "_".join(self.hdrobj.get_sections(goid))
247
        return "{PRE}_{BP}_{NAME}_{SEC}_{DSTR}_{D1s}_{GO}".format(
248
            PRE=pre,
249
            BP=Consts.NAMESPACE2NS[goobj.namespace],
250
            NAME=self._str_replace(name),
251
            SEC=self._str_replace(self._str_replace(sections)),
252
            GO=goid.replace(":", ""),
253
            DSTR=self._get_depthsr(goobj),
254
            D1s=self.gosubdag.go2nt[goobj.id].D1)
255
256
    def _get_depthsr(self, goobj):
257
        """Return DNN or RNN depending on if relationships are loaded."""
258
        if 'reldepth' in self.gosubdag.prt_attr['flds']:
259
            return "R{R:02}".format(R=goobj.reldepth)
260
        return "D{D:02}".format(D=goobj.depth)
261
262
    @staticmethod
263
    def _str_replace(txt):
264
        """Makes a small text amenable to being used in a filename."""
265
        txt = txt.replace(",", "")
266
        txt = txt.replace(" ", "_")
267
        txt = txt.replace(":", "")
268
        txt = txt.replace(".", "")
269
        txt = txt.replace("/", "")
270
        txt = txt.replace("", "")
271
        return txt
272
273
    def prt_summary(self, prt=sys.stdout):
274
        """Print summary of grouping/sorting run."""
275
        # Grouping summary
276
        fmtstr = "Grouped: {U:3,} User GOs, using {h:2,} of {H:,} Grouping GOs, for run: {NAME}\n"
277
        prt.write(fmtstr.format(
278
            NAME=self.grpname,
279
            U=len(self.usrgos),
280
            h=len(self.hdrobj.hdrgos.intersection(self.hdrgo2usrgos.keys())),
281
            H=self.hdrobj.num_hdrgos()))
282
283
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved.
284