OptionalAttrs._init_go2dct()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 17
dl 0
loc 56
rs 1.8
c 3
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like OptionalAttrs._init_go2dct() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Test the loading of the optional GO term fields."""
2
# https://owlcollab.github.io/oboformat/doc/GO.format.obo-1_4.html
3
4
__copyright__ = "Copyright (C) 2010-2018, DV Klopfenstein, H Tang, All rights reserved."
5
__author__ = "DV Klopfenstein"
6
7
8
import os
9
import sys
10
import re
11
import timeit
12
import collections as cx
13
from goatools.test_data.godag_timed import GoDagTimed
14
from goatools.test_data.godag_timed import prt_hms
15
16
17
class OptionalAttrs(object):
18
    """Holds data for GO relationship test."""
19
20
    repo = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../..")
21
    cmpfld = re.compile(r'^(\S+)\s*:\s*(\S.*\S)\s*$')  # Field line pattern
22
    exp_scopes = set(['EXACT', 'BROAD', 'NARROW', 'RELATED'])
23
    exp_xrefpat = re.compile(r'^\S+:\S+$')
24
    exp_xrefpat = re.compile(r'^\S+:\S+$')
25
    # Required attributes are always loaded
26
    exp_req = set(['name', 'id', 'is_obsolete', 'namespace', 'alt_id', 'is_a', 'is_obsolete'])
27
    # Generated attributes
28
    exp_gen = set(['level', 'depth', 'parents', 'children', '_parents'])
29
    exp_relationships = set(['part_of',
30
                             'regulates', 'negatively_regulates', 'positively_regulates'])
31
32
    attrs_scalar = set(['id', 'namespace', 'name', 'def', 'comment'])
33
    attrs_set = set(['xref', 'subset', 'alt_id'])
34
35
    def __init__(self, fin_obo, opt_field=None, keep_alt_ids=False):
36
        self.opt = opt_field  # None causes all fields to read to exp dict
37
        self.obo = os.path.join(self.repo, fin_obo)
38
        self.go2obj = GoDagTimed(self.obo, opt_field, keep_alt_ids).go2obj
39
        self.dcts = self._init_go2dct()  # go2dct typdefdct flds
40
        self.go2dct = {go:d for go, d in self.dcts['go2dct'].items() if go in self.go2obj}
41
        self.num_tot = len(self.go2obj)
42
        self._chk_required()
43
        self._chk_parents()
44
        self._set_exp_children()
45
        self._chk_children()
46
47
    def chk_get_goterms_upper(self):
48
        """Check that GOTerm's 'get_upper' returns parents and relationships."""
49
        tic = timeit.default_timer()
50
        for goterm in self.go2obj.values():
51
            goids_act = set(o.id for o in goterm.get_goterms_upper())
52
            goids_exp = self._get_goterms_upper(goterm.id)
53
            assert goids_act == goids_exp
54
        prt_hms(tic, "get_goterms_upper")
55
56
    def chk_get_goterms_lower(self):
57
        """Check that GOTerm's 'get_lower' returns parents and relationships."""
58
        tic = timeit.default_timer()
59
        for goterm in self.go2obj.values():
60
            goids_act = set(o.id for o in goterm.get_goterms_lower())
61
            goids_exp = self._get_goterms_lower(goterm.id)
62
            assert goids_act == goids_exp, "{GO} EXP({E}) ACT({A})".format(
63
                GO=goterm.id, E=goids_exp, A=goids_act)
64
        prt_hms(tic, "get_goterms_lower")
65
66
    def _get_goterms_upper(self, goid):
67
        """Get expected GO IDs returned by GOTerm's 'get_goterms_upper'."""
68
        goids_exp = set()
69
        dct = self.go2dct[goid]
70
        if 'is_a' in dct:
71
            goids_exp.update(dct['is_a'])
72
        if 'relationship' in dct:
73
            for rel_go in dct['relationship']:
74
                goids_exp.add(rel_go.split()[1])
75
        return goids_exp
76
77
    def _get_goterms_lower(self, goid):
78
        """Get expected GO IDs returned by GOTerm's 'get_goterms_lower'."""
79
        goids_exp = set()
80
        dct = self.go2dct[goid]
81
        if 'is_a_rev' in dct:
82
            goids_exp.update(dct['is_a_rev'])
83
        if 'relationship_rev' in dct:
84
            for rel_gos in dct['relationship_rev'].values():
85
                goids_exp.update(rel_gos)
86
        return goids_exp
87
88
    def chk_relationships_rev(self, reltype='part_of', prt=None):
89
        """Check reciprocal relationships. Print all GO pairs in one type of relationship."""
90
        spc = " "*len(reltype)
91
        rec2revs = cx.defaultdict(set)
92
        for rec in sorted(self.go2obj.values(), key=lambda o: o.namespace):
93
            reldct = rec.relationship
94
            if reltype in reldct:
95
                if prt is not None:
96
97
                    prt.write("{SPC} {GO}\n".format(SPC=spc, GO=str(rec)))
98
                for related_to in reldct[reltype]:
99
                    rec2revs[related_to].add(rec)
100
                    if prt is not None:
101
                        prt.write("{RELTYPE} {GO}\n".format(RELTYPE=reltype, GO=str(related_to)))
102
                if prt is not None:
103
                    prt.write("\n")
104
        for rec, exp_revs in sorted(rec2revs.items(), key=lambda t: t[0].namespace):
105
            if prt is not None:
106
                prt.write("    {SPC} {GO}\n".format(SPC=spc, GO=str(rec)))
107
            assert rec.relationship_rev[reltype] == exp_revs
108
            for related_from in rec.relationship_rev[reltype]:
109
                if prt is not None:
110
                    prt.write("rev {RELTYPE} {GO}\n".format(RELTYPE=reltype, GO=str(related_from)))
111
            if prt is not None:
112
                prt.write("\n")
113
114
    def chk_str(self, attr):
115
        """Check that expected scalar value matches actual string value."""
116
        for goid, rec in self.go2obj.items():
117
            # A string data member must always be present, even if the value is ""
118
            act_str = getattr(rec, attr)
119
            exp_dct = self.go2dct[goid]
120
            # Expected string equals actual string?
121
            if attr in exp_dct:
122
                exp_str = next(iter(exp_dct[attr]))
123
                assert exp_str == act_str, "{} EXP({}) ACT({})".format(
124
                    goid, exp_str, act_str)
125
            # If there is no expected string, is actual string ""?
126
            else:
127
                assert act_str == ""
128
129
    def prt_summary(self, prt=sys.stdout):
130
        """Print percentage of GO IDs that have a specific relationship."""
131
        sep = "\n-----------------------------------------------------------\n"
132
        flds_seen = self.dcts['flds']
133
        fld_cnts_go = self._get_cnts_gte1(self.go2dct.values())
134
        prt.write("{SEP}GO TERM REQUIRED FIELDS:\n".format(SEP=sep))
135
        self._prt_summary(prt, fld_cnts_go, self.exp_req, self.go2dct.values())
136
        flds_seen = flds_seen.difference(self.exp_req)
137
        prt.write("{SEP}GO TERM OPTIONAL FIELDS:\n".format(SEP=sep))
138
        self._prt_summary(prt, fld_cnts_go, flds_seen, self.go2dct.values())
139
        flds_seen = flds_seen.difference(fld_cnts_go.keys())
140
        prt.write("{SEP}Typedef FIELDS:\n".format(SEP=sep))
141
        fld_cnts_typedef = self._get_cnts_gte1(self.dcts['typedefdct'].values())
142
        self._prt_summary(prt, fld_cnts_typedef, flds_seen, self.dcts['typedefdct'])
143
        flds_seen = flds_seen.difference(fld_cnts_typedef.keys())
144
        assert flds_seen == set(['consider', 'replaced_by']), "UNEXPECTED FIELDS({})".format(
145
            flds_seen)
146
147
    def _prt_summary(self, prt, fld_cnts, prt_flds, dcts):
148
        prt.write("\n    These fields appear at least once\n")
149
        # Ex: 28,951 of 44,948 (64%) GO IDs has field(synonym)
150
        for relname, cnt in fld_cnts.most_common():
151
            if prt_flds is None or relname in prt_flds:
152
                self._prt_perc(cnt, relname, len(dcts), prt)
153
        prt.write("\n    Maximum number of fields:\n")
154
        for fld, maxqty in sorted(self._get_cnts_max(dcts).items(), key=lambda t: t[1]):
155
            if prt_flds is None or fld in prt_flds:
156
                prt.write("        {MAX:3} {MRK} {FLD}\n".format(
157
                    MAX=maxqty, MRK=self._get_fldmrk(fld), FLD=fld))
158
159
    def _chk_parents(self):
160
        """Check parents."""
161
        for goobj in self.go2obj.values():
162
            exp_dct = self.go2dct[goobj.id]
163
            if 'is_a' in exp_dct:
164
                # pylint: disable=protected-access
165
                exp_parents = exp_dct['is_a']
166
                act_parents = goobj._parents
167
                assert exp_parents == act_parents
168
            else:
169
                assert not goobj.parents
170
171
    def _chk_children(self):
172
        """Check children."""
173
        for goobj in self.go2obj.values():
174
            exp_dct = self.go2dct[goobj.id]
175
            if '_children' in exp_dct:
176
                exp_children = exp_dct['_children']
177
                act_children = set(o.id for o in goobj.children)
178
                assert exp_children == act_children
179
            else:
180
                assert not goobj.children
181
182
    def _set_exp_children(self):
183
        """Fill expected child GO IDs."""
184
        # Initialize empty sets for child GO IDs
185
        for exp_dct in self.go2dct.values():
186
            exp_dct['_children'] = set()
187
        # Loop thru all GO IDs
188
        for goid_child, exp_dct in self.go2dct.items():
189
            if 'is_a' in exp_dct:
190
                # Add current GO ID to all of it's parents' set of children
191
                for goid_parent in exp_dct['is_a']:
192
                    self.go2dct[goid_parent]['_children'].add(goid_child)
193
194
    def _chk_required(self):
195
        """Check the required attributes."""
196
        for goid, goobj in self.go2obj.items():
197
            godct = self.go2dct[goid]
198
            assert goobj.id == godct['GO']
199
            assert goobj.namespace == next(iter(godct['namespace'])), godct
200
            assert goobj.name == next(iter(godct['name']))
201
            self._chk_is_obsolete(goobj, godct)
202
            self._chk_alt_ids(goobj, godct)
203
204
    @staticmethod
205
    def _chk_alt_ids(goobj, godct):
206
        """Check 'alt_ids' required attribute."""
207
        if 'alt_id' in godct:
208
            assert godct['alt_id'] == goobj.alt_ids
209
        else:
210
            assert not goobj.alt_ids
211
212
    @staticmethod
213
    def _chk_is_obsolete(goobj, godct):
214
        """Check 'is_obsolete' required attribute."""
215
        act_obso = getattr(goobj, 'is_obsolete', None)
216
        if act_obso:
217
            assert 'is_obsolete' in godct, "EXP({})\nACT({})".format(
218
                godct, getattr(goobj, 'is_obsolete', None))
219
        else:
220
            assert 'is_obsolete' not in godct, "EXP({})\nACT({})".format(
221
                godct, getattr(goobj, 'is_obsolete', None))
222
223
    def chk_no_optattrs(self):
224
        """Check that only the optional attributes requested are the attributes implemented."""
225
        # name is_obsolete namespace id alt_ids
226
        # level namespace depth parents children _parents
227
        exp_flds = self.exp_req.union(self.exp_gen)
228
        for goobj in self.go2obj.values():
229
            assert set(vars(goobj).keys()).difference(exp_flds) == set(['alt_ids'])
230
            # print(vars(goobj).keys())
231
            # print(" ".join(vars(goobj).keys()))
232
233
    def chk_xref(self, prt=None):
234
        """Check synonyms."""
235
        # Get GO IDs which are expected to have synonyms
236
        goids = set(go for go, d in self.go2dct.items() if 'xref' in d)
237
        for goid in goids:
238
            goobj = self.go2obj[goid]
239
            xrefs = getattr(goobj, 'xref', None)
240
            assert xrefs is not None, "{GO} MISSING XREF".format(GO=goid)
241
            # Iterate through list of synonym data stored in named tuples
242
            for dbxref in xrefs:
243
                if prt is not None:
244
                    prt.write("{GO} {DBXREF}\n".format(GO=goid, DBXREF=dbxref))
245
                assert self.exp_xrefpat.match(dbxref), "INVALID XREF FORMAT"
246
247
    def chk_synonyms(self, prt=None):
248
        """Check synonyms."""
249
        # Get GO IDs which are expected to have synonyms
250
        for goid, dct_exp in self.go2dct.items():
251
            goobj = self.go2obj[goid]
252
            if 'synonym' in dct_exp:
253
                ntsyns = getattr(goobj, 'synonym', None)
254
                assert ntsyns is not None, "{GO} MISSING SYNONYM".format(GO=goid)
255
                # Iterate through list of synonym data stored in named tuples
256
                for ntsyn in ntsyns:
257
                    if prt is not None:
258
                        prt.write("{GO} {NT}\n".format(GO=goid, NT=ntsyn))
259
                    assert ntsyn.text, "SYNONYM CANNOT BE EMPTY"
260
                    assert ntsyn.scope in self.exp_scopes, "INVALID SYNONYM SCOPE"
261
                    for dbxref in ntsyn.dbxrefs:
262
                        assert self.exp_xrefpat.match(dbxref), "INVALID SYNONYM DBXREF"
263
            else:
264
                assert goobj.synonym == []
265
266
    def _get_fldmrk(self, fld):
267
        """Get a mark for each field indicating if it is required or optional"""
268
        #pylint: disable=too-many-return-statements
269
        if fld in self.exp_req:
270
            return 'REQ'
271
        if fld == 'def':
272
            return 'str'
273
        if fld in self.attrs_scalar:
274
            return 'str'
275
        if fld in self.attrs_set:
276
            return 'set'
277
        if fld == 'relationship':
278
            return 'rel'
279
        if fld == 'synonym':
280
            return 'syn'
281
        if fld == 'xref':
282
            return 'xrf'
283
        raise RuntimeError("UNEXPECTED FIELD({})".format(fld))
284
285
    @staticmethod
286
    def _prt_perc(num_rel, name, num_tot, prt=sys.stdout):
287
        """Print percentage of GO IDs that have a specific relationship."""
288
        prt.write("        {N:6,} of {M:,} ({P:3.0f}%) GO IDs has field({A})\n".format(
289
            N=num_rel, M=num_tot, P=float(num_rel)/num_tot*100, A=name))
290
291
    def _get_cnts_max(self, dcts):
292
        """Get the maximum count of times a specific relationship was seen on a GO."""
293
        fld2qtys = cx.defaultdict(set)
294
        flds = self.dcts['flds']
295
        # for recdct in self.go2dct.values():
296
        for recdct in dcts:
297
            for opt in flds:
298
                if opt in recdct:
299
                    fld2qtys[opt].add(len(recdct[opt]))
300
        return {f:max(qtys) for f, qtys in fld2qtys.items()}
301
302
    def _get_cnts_gte1(self, record_dicts):
303
        """Get counts of if a specific relationship was seen on a GO."""
304
        ctr = cx.Counter()
305
        flds = self.dcts['flds']
306
        # for recdct in self.go2dct.values():
307
        for recdct in record_dicts:
308
            for opt in flds:
309
                if opt in recdct:
310
                    ctr[opt] += 1
311
        return ctr
312
313
    def chk_set(self, opt):
314
        """Check that actual set contents match expected set contents."""
315
        errpat = "SET EXP({EXP}) ACT({ACT}) {GO}\n{DESC}:\nEXP:\n{Es}\n\nACT:\n{As}"
316
        for goid, dct in self.go2dct.items():
317
            act_set = getattr(self.go2obj[goid], opt, None)
318
            if opt in dct:
319
                exp_set = dct[opt]
320
                assert exp_set == act_set, errpat.format(
321
                    EXP=len(exp_set), ACT=len(act_set), GO=goid,
322
                    DESC=str(self.go2obj[goid].name),
323
                    Es="\n".join(sorted(exp_set)),
324
                    As="\n".join(sorted(act_set)))
325
            else:
326
                assert act_set == set(), "EXPECTED EMPTY SET FOR {O}: ACT({A})\n".format(
327
                    O=opt, A=act_set)
328
329
    def chk_relationships(self):
330
        """Expected relationship GO IDs should match actual relationship GO IDs."""
331
        for goid, dct in self.go2dct.items():
332
            act_rel2recs = getattr(self.go2obj[goid], 'relationship', None)
333
            if 'relationship' in dct:
334
                rel2gos = self._mk_exp_relatinship_sets(dct['relationship'])
335
                # Check if expected relationships and actual relationships are the same
336
                assert set(act_rel2recs.keys()) == set(rel2gos.keys()), "EXP({}) != ACT({})".format(
337
                    set(act_rel2recs.keys()), set(rel2gos.keys()))
338
                for rel, exp_goids in rel2gos.items():
339
                    # Expected relationships store GO IDs.
340
                    # Actual relationships store GO Terms.
341
                    act_goids = set(o.id for o in act_rel2recs[rel])
342
                    assert exp_goids == act_goids, "EXP({}) ACT({}) {}:\nEXP({})\nACT({})".format(
343
                        len(exp_goids), len(act_goids), goid, exp_goids, act_goids)
344
            else:
345
                assert act_rel2recs == {}, act_rel2recs
346
347
    def _mk_exp_relatinship_sets(self, relationship_str_set):
348
        """Transform a set of relationship strings into a dict of sets containing GO IDs."""
349
        rel2gos = cx.defaultdict(set)
350
        for rel_str in relationship_str_set:
351
            rel, goid = rel_str.split()
352
            assert rel in self.exp_relationships
353
            assert goid[:3] == "GO:" and goid[3:].isdigit()
354
            rel2gos[rel].add(goid)
355
        return rel2gos
356
357
    @staticmethod
358
    def add_is_a_rev(go2dct):
359
        """If there 'is_a' exists, add 'is_a_rev'."""
360
        for go_src, dct in go2dct.items():
361
            if 'is_a' in dct:
362
                for go_parent in dct['is_a']:
363
                    if 'is_a_rev' not in go2dct[go_parent]:
364
                        go2dct[go_parent]['is_a_rev'] = set()
365
                    go2dct[go_parent]['is_a_rev'].add(go_src)
366
367
    @staticmethod
368
    def add_relationship_rev(go2dct):
369
        """If there is a relationship, add 'relationship_rev'."""
370
        for go_src, dct in go2dct.items():
371
            if 'relationship' in dct:
372
                for rel in dct['relationship']:
373
                    reltype, go_dst = rel.split()
374
                    # print("RRRRRRRRR", go_src, reltype, go_dst)
375
                    if 'relationship_rev' not in go2dct[go_dst]:
376
                        go2dct[go_dst]['relationship_rev'] = {}
377
                    if reltype not in go2dct[go_dst]['relationship_rev']:
378
                        go2dct[go_dst]['relationship_rev'][reltype] = set()
379
                    go2dct[go_dst]['relationship_rev'][reltype].add(go_src)
380
381
    # pylint: disable=too-many-branches
382
    def _init_go2dct(self):
383
        """Create EXPECTED RESULTS stored in a dict of GO fields."""
384
        go2dct = {}
385
        # pylint: disable=unsubscriptable-object
386
        typedefdct = {}
387
        flds = set()
388
        with open(self.obo) as ifstrm:
389
            rec = {}
390
            rec_typedef = None
391
            for line in ifstrm:
392
                line = line.rstrip()
393
                # End of GO record
394
                if not line:
395
                    if rec:  # and option is None or option in rec:
396
                        # 'Definition' is specified in obo as 'def' and in Python by 'defn'
397
                        if 'def' in rec:
398
                            rec['defn'] = rec['def']
399
                        go2dct[rec['GO']] = rec
400
                    rec = {}
401
                    if rec_typedef is not None:
402
                        typedefdct[rec_typedef['id']] = rec_typedef
403
                        rec_typedef = None
404
                elif line[:9] == "[Typedef]":
405
                    rec_typedef = {}
406
                else:
407
                    mtch = self.cmpfld.match(line)
408
                    if mtch:
409
                        fld = mtch.group(1)
410
                        val = mtch.group(2)
411
412
                        # Beginning of GO record
413
                        if fld == "id":
414
                            assert not rec, "NOW({}) WAS({})".format(line, rec)
415
                            rec = {'GO':val, 'id':val}
416
                            flds.add(fld)
417
                        # Middle of GO record
418
                        elif rec:
419
                            flds.add(fld)
420
                            if fld not in rec:
421
                                rec[fld] = set()
422
                            # Strip comment if it exists
423
                            loc = val.find(' ! ')
424
                            if loc != -1:
425
                                val = val[:loc]
426
                            # Add value
427
                            rec[fld].add(val)
428
429
                        if rec_typedef is not None:
430
                            rec_typedef[fld] = val
431
432
        for dct in go2dct.values():
433
            if 'def' in dct:
434
                dct['defn'] = dct['def']
435
        self.add_relationship_rev(go2dct)
436
        self.add_is_a_rev(go2dct)
437
        return {'go2dct':go2dct, 'typedefdct':typedefdct, 'flds':flds}
438
439
440
# Copyright (C) 2010-2018, DV Klopfenstein, H Tang, All rights reserved.
441