GpadReader   C
last analyzed

Complexity

Total Complexity 54

Size/Duplication

Total Lines 207
Duplicated Lines 12.08 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 25
loc 207
rs 6.4799
c 2
b 0
f 0
wmc 54

12 Methods

Rating   Name   Duplication   Size   Complexity  
A _split_line() 0 7 2
A _chk_qualifier() 0 5 3
A _get_taxon() 0 9 4
B read_gpad() 10 28 8
B _get_ntgpad() 0 31 5
B _rd_fld_vals() 15 15 8
B _get_properties() 0 17 6
A prt_summary_anno2ev() 0 11 4
A __init__() 0 6 2
A get_relation_cnt() 0 7 3
A _chk_qty_eq_1() 0 6 3
B _get_extensions() 0 20 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like GpadReader often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Read a Gene Product Association Data (GPAD) and store the data in a Python object.
2
3
    Annotations available from the Gene Ontology Consortium:
4
5
6
    GPAD format:
7
        http://geneontology.org/page/gene-product-association-data-gpad-format
8
"""
9
10
import sys
11
import re
12
import collections as cx
13
from goatools.base import nopen
14
from goatools.anno.extensions.extensions import AnnotationExtensions
15
from goatools.anno.extensions.extension import AnnotationExtension
16
17
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang. All rights reserved."
18
__author__ = "DV Klopfenstein"
19
20
21
class GpadReader(object):
22
    """Read a Gene Product Association Data (GPAD) and store the data in a Python object."""
23
24
    # http://geneontology.org/page/gene-product-association-data-gpad-format
25
    gpadhdr = [ #              Col Req?     Cardinality    Example
26
        #                      --- -------- -------------- -----------------
27
        'DB',                 #  0 required 1              UniProtKB
28
        'DB_ID',              #  1 required 1              P12345
29
        'Qualifier',          #  2 required 1 or greater   NOT
30
        'GO_ID',              #  3 required 1              GO:0003993
31
        # DB_Ref: set([''DOI:10.1002/sita.200600112', 'GO_REF:0000037', 'Reactome:R-HSA-6814682'])
32
        'DB_Reference',       #  4 required 1 or greater   set(['PMID:2676709',
33
        'ECO_Evidence_Code',  #  5 required 1              ECO:NNNNNNN
34
        'With_From',          #  6 optional 0 or greater   GO:0000346
35
        'Taxon',              #  7 optional 0 or 1         taxon:9606
36
        'Date',               #  8 required 1              20090118
37
        # Assigned_By: Ensembl FlyBase GO_Central GOC MGI Reactome UniProt WormBase
38
        'Assigned_By',        #  9 required 1              SGD
39
        # Annotations (Optional)
40
        'Extension',          # 10 optional 0 or greater
41
        'Properties',         # 11 optional 0 or greater
42
    ]
43
44
    gpad_columns = {"1.1" : gpadhdr}            # !gpad-version: 1.1
45
46
    # Expected numbers of columns for various versions
47
    exp_numcol = 12
48
49
    # Expected values for a Qualifier
50
    exp_qualifiers = set([
51
        'NOT', 'contributes_to', 'colocalizes_with', 'enables', 'involved_in',
52
        'part_of',
53
    ])
54
55
    def __init__(self, filename=None, hdr_only=False):
56
        self.filename = filename
57
        # Initialize associations and header information
58
        self.hdr = None
59
        self.associations = self.read_gpad(filename, hdr_only) if filename is not None else []
60
        self.qty = len(self.associations)
61
62
    def prt_summary_anno2ev(self):
63
        """Print annotation/evidence code summary."""
64
        ctr = cx.Counter()
65
        for ntgpad in self.associations:
66
            evidence_code = ntgpad.Evidence_Code
67
            if 'NOT' not in ntgpad.Qualifier:
68
                ctr[evidence_code] += 1
69
            elif 'NOT' in ntgpad.Qualifier:
70
                ctr["NOT {EV}".format(EV=ntgpad.Evidence_Code)] += 1
71
            else:
72
                raise Exception("UNEXPECTED INFO")
73
74
    def _get_ntgpad(self, ntgpadobj, flds):
75
        """Convert fields from string to preferred format for GPAD ver 2.1 and 2.0."""
76
        is_set = False
77
        qualifiers = self._rd_fld_vals("Qualifier", flds[2], is_set)
78
        assert flds[3][:3] == 'GO:', 'UNRECOGNIZED GO({GO})'.format(GO=flds[3])
79
        db_reference = self._rd_fld_vals("DB_Reference", flds[4], is_set, 1)
80
        assert flds[5][:4] == 'ECO:', 'UNRECOGNIZED ECO({ECO})'.format(ECO=flds[3])
81
        with_from = self._rd_fld_vals("With_From", flds[6], is_set)
82
        taxons = self._get_taxon(flds[7])
83
        assert flds[8].isdigit(), 'UNRECOGNIZED DATE({D})'.format(D=flds[8])
84
        assert flds[9], '"Assigned By" VALUE WAS NOT FOUND'
85
        exten = self._get_extensions(flds[10])
86
        props = self._get_properties(flds[11])
87
        self._chk_qty_eq_1(flds, [0, 1, 3, 5, 8, 9])
88
        # Additional Formatting
89
        self._chk_qualifier(qualifiers)
90
        # Create list of values
91
        gpadvals = [
92
            flds[0],      #  0  DB
93
            flds[1],      #  1  DB_ID
94
            qualifiers,   #  3  Qualifier
95
            flds[3],      #  4  GO_ID
96
            db_reference, #  5  DB_Reference
97
            flds[5],      #  6  ECO_Evidence_Code
98
            with_from,    #  7  With_From
99
            taxons,       # 12 Taxon
100
            flds[8],      # 13 Date
101
            flds[9],      # 14 Assigned_By
102
            exten,        # 12 Extension
103
            props]        # 12 Annotation_Properties
104
        return ntgpadobj._make(gpadvals)
105
106 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
107
    def _rd_fld_vals(name, val, set_list_ft=True, qty_min=0, qty_max=None):
108
        """Further split a GPAD value within a single field."""
109
        if not val and qty_min == 0:
110
            return [] if set_list_ft else set()
111
        vals = val.split('|') # Use a pipe to separate entries
112
        num_vals = len(vals)
113
        assert num_vals >= qty_min, \
114
            "FLD({F}): MIN QUANTITY({Q}) WASN'T MET: {V}".format(
115
                F=name, Q=qty_min, V=vals)
116
        if qty_max is not None:
117
            assert num_vals <= qty_max, \
118
                "FLD({F}): MAX QUANTITY({Q}) EXCEEDED: {V}".format(
119
                    F=name, Q=qty_max, V=vals)
120
        return vals if set_list_ft else set(vals)
121
122
    @staticmethod
123
    def _get_taxon(taxon):
124
        """Return Interacting taxon ID | optional | 0 or 1 | gaf column 13."""
125
        if not taxon:
126
            return None
127
        assert taxon[:6] == 'taxon:', 'UNRECOGNIZED Taxon({Taxon})'.format(Taxon=taxon)
128
        taxid = taxon[6:]
129
        assert taxid.isdigit(), "UNEXPECTED TAXON({T})".format(T=taxid)
130
        return int(taxid)
131
132
    def _get_properties(self, fldstr):
133
        """Return optional Annotation Properties (0 or greater)."""
134
        prop2val = {}
135
        props = self._rd_fld_vals("Properties", fldstr, False)  # Get set
136
        go_evidence = None
137
        for prop in props:
138
            # There can be more properties than 'go_evidence',
139
            # but currently we see only 'go_evidence'.
140
            # Upon encountering updates, evaluate and update code to support ...
141
            if prop[:12] == 'go_evidence=':
142
                assert go_evidence is None, "MORE THAN ONE EVIDENCE CODE FOUND"
143
                go_evidence = prop[12:]
144
            else:
145
                assert False, "UNPROGRAMMED PROPERTY({P})".format(P=prop)
146
        assert go_evidence is not None, "go_evidence == None"
147
        prop2val['go_evidence'] = go_evidence
148
        return prop2val
149
150
    def _get_extensions(self, extline):
151
        """Return zero or greater Annotation Extensions, given a line of text."""
152
        # Extension examples:
153
        #   has_direct_input(UniProtKB:P37840),occurs_in(GO:0005576)
154
        #   part_of(UBERON:0006618),part_of(UBERON:0002302)
155
        #   occurs_in(CL:0000988)|occurs_in(CL:0001021)
156
        if not extline:
157
            return None
158
        exts = []
159
        for ext_lst in extline.split('|'):
160
            grp = []
161
            for ext in ext_lst.split(','):
162
                idx = ext.find('(')
163
                if idx != -1 and ext[-1] == ')':
164
                    grp.append(AnnotationExtension(ext[:idx], ext[idx+1:-1]))
165
                else:
166
                    # Ignore improperly formatted Extensions
167
                    sys.stdout.write('{F}: BAD Extension({E})\n'.format(F=self.filename, E=ext))
168
            exts.append(grp)
169
        return AnnotationExtensions(exts)
170
171
    def read_gpad(self, fin_gpad, hdr_only=False):
172
        """Read GPAD file. HTTP address okay. GZIPPED/BZIPPED file okay."""
173
        ga_lst = []
174
        ver = None
175
        ntgpadobj = None
176
        hdrobj = GpadHdr()
177
        ifstrm = nopen(fin_gpad)
178
        for line in ifstrm:
179
            # Read header
180 View Code Duplication
            if ntgpadobj is None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
181
                if line[0] == '!':
182
                    if ver is None and line[1:13] == 'gpa-version:':
183
                        ver = line[13:].strip()
184
                    hdrobj.chkaddhdr(line)
185
                else:
186
                    self.hdr = hdrobj.get_hdr()
187
                    if hdr_only:
188
                        return ga_lst
189
                    ntgpadobj = cx.namedtuple("ntgpadobj", " ".join(self.gpad_columns[ver]))
190
            # Read data
191
            if ntgpadobj is not None:
192
                flds = self._split_line(line)
193
                ntgpad = self._get_ntgpad(ntgpadobj, flds)
194
                ga_lst.append(ntgpad)
195
        # GPAD file has been read
196
        readmsg = "  READ {N:7,} associations: {FIN}\n"
197
        sys.stdout.write(readmsg.format(N=len(ga_lst), FIN=fin_gpad))
198
        return ga_lst
199
200
    def _split_line(self, line):
201
        """Split line into field values."""
202
        line = line.rstrip('\r\n')
203
        flds = re.split('\t', line)
204
        assert len(flds) == self.exp_numcol, "EXPECTED({E}) COLUMNS, ACTUAL({A}): {L}".format(
205
            E=self.exp_numcol, A=len(flds), L=line)
206
        return flds
207
208
    def _chk_qualifier(self, qualifiers):
209
        """Check that qualifiers are expected values."""
210
        # http://geneontology.org/page/go-annotation-conventions#qual
211
        for qual in qualifiers:
212
            assert qual in self.exp_qualifiers, "UNEXPECTED QUALIFIER({Q})".format(Q=qual)
213
214
    @staticmethod
215
    def _chk_qty_eq_1(flds, col_lst):
216
        """Check that these fields have only one value: required 1."""
217
        for col in col_lst:
218
            assert flds[col], "UNEXPECTED REQUIRED VALUE({V}) AT INDEX({R})".format(
219
                V=flds[col], R=col)
220
221
    def get_relation_cnt(self):
222
        """Return a Counter containing all relations contained in the Annotation Extensions."""
223
        ctr = cx.Counter()
224
        for ntgpad in self.associations:
225
            if ntgpad.Extension is not None:
226
                ctr += ntgpad.Extension.get_relations_cnt()
227
        return ctr
228
229
230
class GpadHdr(object):
231
    """Used to build a GPAD header."""
232
233
    cmpline = re.compile(r'^!(\w[\w\s-]+:.*)$')
234
235
    def __init__(self):
236
        self.gpadhdr = []
237
238
    def get_hdr(self):
239
        """Return GPAD header data as a string paragragh."""
240
        return "\n".join(self.gpadhdr)
241
242
    def chkaddhdr(self, line):
243
        """If this line contains desired header info, save it."""
244
        mtch = self.cmpline.search(line)
245
        if mtch:
246
            self.gpadhdr.append(mtch.group(1))
247
248
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang. All rights reserved."
249