GafReader   A
last analyzed

Complexity

Total Complexity 29

Size/Duplication

Total Lines 92
Duplicated Lines 10.87 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 10
loc 92
rs 10
wmc 29

6 Methods

Rating   Name   Duplication   Size   Complexity  
A _prt_ignored_lines() 0 9 3
A prt_summary_anno2ev() 0 12 4
A _prt_read_summary() 0 7 4
D read_gaf() 10 41 13
A __init__() 0 8 4
A prt_ignore_line() 0 5 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Read a GO Association File (GAF) and store the data in a Python object.
2
3
    Annotations available from the Gene Ontology Consortium:
4
        http://geneontology.org/page/download-annotations
5
6
    GAF format:
7
        http://geneontology.org/page/go-annotation-file-formats
8
"""
9
10
import sys
11
import os
12
import re
13
import collections as cx
14
from goatools.evidence_codes import EvidenceCodes
15
16
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang. All rights reserved."
17
__author__ = "DV Klopfenstein"
18
19
20
# pylint: disable=broad-except,too-few-public-methods,line-too-long
21
class GafReader(object):
22
    """Reads a Gene Annotation File (GAF). Returns a Python object."""
23
24
    exp_kwdct = set(['allow_missing_symbol'])
25
26
    def __init__(self, filename=None, hdr_only=False, prt=sys.stdout, **kws):
27
        # kws: allow_missing_symbol
28
        self.kws = {k:v for k, v in kws.items() if k in self.exp_kwdct}
29
        self.filename = filename
30
        self.evobj = EvidenceCodes()
31
        # Initialize associations and header information
32
        self.hdr = None
33
        self.associations = self.read_gaf(filename, hdr_only, prt) if filename is not None else []
34
35
    def read_gaf(self, fin_gaf, hdr_only, prt):
36
        """Read GAF file. Store annotation data in a list of namedtuples."""
37
        nts = []
38
        ver = None
39
        hdrobj = GafHdr()
40
        datobj = None
41
        lnum = line = -1
42
        ignored = []
43
        try:
44
            with open(fin_gaf) as ifstrm:
45
                for lnum, line in enumerate(ifstrm, 1):
46
                    # Read header
47 View Code Duplication
                    if datobj is None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
48
                        if line[0] == '!':
49
                            if ver is None and line[1:13] == 'gaf-version:':
50
                                ver = line[13:].strip()
51
                            hdrobj.chkaddhdr(line)
52
                        else:
53
                            self.hdr = hdrobj.get_hdr()
54
                            if hdr_only:
55
                                return nts
56
                            datobj = GafData(ver, **self.kws)
57
                    # Read data
58
                    if datobj is not None and line[0] != '!':
59
                        # print(lnum, line)
60
                        ntgaf = datobj.get_ntgaf(line, lnum)
61
                        if ntgaf is not None:
62
                            nts.append(ntgaf)
63
                        else:
64
                            ignored.append((lnum, line))
65
        except Exception as inst:
66
            import traceback
67
            traceback.print_exc()
68
            sys.stderr.write("\n  **FATAL in read_gaf: {MSG}\n\n".format(MSG=str(inst)))
69
            sys.stderr.write("**FATAL: {FIN}[{LNUM}]:\n{L}".format(FIN=fin_gaf, L=line, LNUM=lnum))
70
            if datobj is not None:
71
                datobj.prt_line_detail(prt, line)
72
            sys.exit(1)
73
        # GAF file has been read
74
        self._prt_read_summary(prt, fin_gaf, nts, datobj, ignored)
75
        return self.evobj.sort_nts(nts, 'Evidence_Code')
76
77
    def _prt_read_summary(self, prt, fin_gaf, nts, datobj, ignored):
78
        """Print a summary about the GAF file that was read."""
79
        fout_log = self._prt_ignored_lines(ignored, datobj, fin_gaf) if ignored else None
80
        if prt is not None:
81
            prt.write("  READ    {N:9,} associations: {FIN}\n".format(N=len(nts), FIN=fin_gaf))
82
            if ignored:
83
                prt.write("  IGNORED {N:9,} associations: {FIN}\n".format(N=len(ignored), FIN=fout_log))
84
85
    def _prt_ignored_lines(self, ignored, datobj, fin_gaf):
86
        """Print ignored lines to a log file."""
87
        fout_log = "{}.log".format(fin_gaf)
88
        with open(fout_log, 'w') as prt:
89
            for lnum, line in ignored:
90
                self.prt_ignore_line(prt, fin_gaf, line, lnum)
91
                datobj.prt_line_detail(prt, line)
92
                prt.write("\n")
93
        return fout_log
94
95
    def prt_summary_anno2ev(self, prt=sys.stdout):
96
        """Print annotation/evidence code summary."""
97
        ctr = cx.Counter()
98
        for ntgaf in self.associations:
99
            evidence_code = ntgaf.Evidence_Code
100
            if 'NOT' not in ntgaf.Qualifier:
101
                ctr[evidence_code] += 1
102
            elif 'NOT' in ntgaf.Qualifier:
103
                ctr["NOT {EV}".format(EV=ntgaf.Evidence_Code)] += 1
104
            else:
105
                raise Exception("UNEXPECTED INFO")
106
        self.evobj.prt_ev_cnts(ctr, prt)
107
108
    @staticmethod
109
    def prt_ignore_line(prt, fin_gaf, line, lnum):
110
        """Print a message saying that we are ignoring an association line."""
111
        prt.write("**WARNING: BADLY FORMATTED LINE. IGNORED {FIN}[{LNUM}]:\n{L}\n".format(
112
            FIN=os.path.basename(fin_gaf), L=line, LNUM=lnum))
113
114
class GafData(object):
115
    """Extracts GAF fields from a GAF line."""
116
117
    spec_req1 = [0, 1, 2, 4, 6, 8, 11, 13, 14]
118
119
    req_str = ["REQ", "REQ", "REQ", "", "REQ", "REQ", "REQ", "", "REQ", "", "",
120
               "REQ", "REQ", "REQ", "REQ", "", ""]
121
122
    gafhdr = [ #           Col Req?     Cardinality    Example
123
        #                  --- -------- -------------- -----------------
124
        'DB',             #  0 required 1              UniProtKB
125
        'DB_ID',          #  1 required 1              P12345
126
        'DB_Symbol',      #  2 required 1              PHO3
127
        'Qualifier',      #  3 optional 0 or greater   NOT
128
        'GO_ID',          #  4 required 1              GO:0003993
129
        'DB_Reference',   #  5 required 1 or greater   PMID:2676709
130
        'Evidence_Code',  #  6 required 1              IMP
131
        'With_From',      #  7 optional 0 or greater   GO:0000346
132
        'Aspect',         #  8 required 1              F
133
        'DB_Name',        #  9 optional 0 or 1         Toll-like receptor 4
134
        'DB_Synonym',     # 10 optional 0 or greater   hToll|Tollbooth
135
        'DB_Type',        # 11 required 1              protein
136
        'Taxon',          # 12 required 1 or 2         taxon:9606
137
        'Date',           # 13 required 1              20090118
138
        'Assigned_By',    # 14 required 1              SGD
139
    ]
140
141
    #                            Col Required Cardinality  Example
142
    gafhdr2 = [ #                --- -------- ------------ -------------------
143
        'Annotation_Extension', # 15 optional 0 or greater part_of(CL:0000576)
144
        'Gene_Product_Form_ID', # 16 optional 0 or 1       UniProtKB:P12345-2
145
    ]
146
147
    gaf_columns = {
148
        "2.1" : gafhdr + gafhdr2, # !gaf-version: 2.1
149
        "2.0" : gafhdr + gafhdr2, # !gaf-version: 2.0
150
        "1.0" : gafhdr}           # !gaf-version: 1.0
151
152
    # Expected numbers of columns for various versions
153
    gaf_numcol = {
154
        "2.1" : 17,
155
        "2.0" : 17,
156
        "1.0" : 15}
157
158
    # Expected values for a Qualifier
159
    exp_qualifiers = set(['not', 'contributes_to', 'colocalizes_with'])
160
161
    def __init__(self, ver, allow_missing_symbol=False):
162
        self.ver = ver
163
        self.ntgafobj = cx.namedtuple("ntgafobj", " ".join(self.gaf_columns[ver]))
164
        self.req1 = self.spec_req1 if not allow_missing_symbol else [i for i in self.spec_req1 if i != 2]
165
        self.exp_mincol = 15  # Last required field is at the 15th column
166
167
    def get_ntgaf(self, line, lnum):
168
        """Return namedtuple filled with data."""
169
        flds = self.split_line(line)
170
        num_flds = len(flds)
171
        if num_flds >= self.exp_mincol:
172
            return self._get_ntgaf(flds, num_flds, lnum)
173
174
    @staticmethod
175
    def split_line(line):
176
        """Split line into field values."""
177
        line = re.split('\t', line)
178
        line[-1] = line[-1].rstrip('\r\n')
179
        return line
180
181
    def _get_ntgaf(self, flds, num_flds, lnum):
182
        """Convert fields from string to preferred format for GAF ver 2.1 and 2.0."""
183
        # Cardinality
184
        is_set = False
185
        is_list = True
186
        qualifiers = [t.lower() for t in self._rd_fld_vals("Qualifier", flds[3], is_set)]
187
        db_reference = self._rd_fld_vals("DB_Reference", flds[5], is_set, 1)
188
        with_from = self._rd_fld_vals("With_From", flds[7], is_set)
189
        db_name = self._rd_fld_vals("DB_Name", flds[9], is_set, 0)  # , 1)
190
        db_synonym = self._rd_fld_vals("DB_Synonym", flds[10], is_set)
191
        taxons = self._rd_fld_vals("Taxon", flds[12], is_list, 1, 2)
192
        if not self._chk_qty_eq_1(flds):
193
            return None
194
        # Additional Formatting
195
        taxons = self._do_taxons(taxons)
196
        self._chk_qualifier(qualifiers)
197
        # Create list of values
198
        gafvals = [
199
            flds[0],      # 0  DB
200
            flds[1],      # 1  DB_ID
201
            flds[2],      # 2  DB_Symbol
202
            qualifiers,   # 3  Qualifier
203
            flds[4],      # 4  GO_ID
204
            db_reference, # 5  DB_Reference
205
            flds[6],      # 6  Evidence_Code
206
            with_from,    # 7  With_From
207
            flds[8],      # 8  Aspect
208
            db_name,      # 9  DB_Name
209
            db_synonym,   # 10 DB_Synonym
210
            flds[11],     # 11 DB_Type
211
            taxons,       # 12 Taxon
212
            flds[12],     # 13 Date
213
            flds[13]]     # 14 Assigned_By
214
        # Version 2.x has these additional fields not found in v1.0
215
        if self.ver[0] == '2':
216
            # i=15) Annotation_Extension: optional 0 or greater; Ex: part_of(CL:0000576)
217
            if num_flds > 15:
218
                gafvals.append(self._rd_fld_vals("Annotation_Extension", flds[15], is_set))
219
            else:
220
                gafvals.append(None)
221
            # i=16) Gene_Product_Form_ID: optional 0 or 1;       Ex: UniProtKB:P12345-2
222
            if num_flds > 16:
223
                #self._prt_line_detail(sys.stdout, flds, lnum)
224
                gafvals.append(self._rd_fld_vals("Gene_Product_Form_ID", flds[16], is_set))
225
            else:
226
                gafvals.append(None)
227
        return self.ntgafobj._make(gafvals)
228
229 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
230
    def _rd_fld_vals(name, val, set_list_ft=True, qty_min=0, qty_max=None):
231
        """Further split a GAF value within a single field."""
232
        if not val and qty_min == 0:
233
            return [] if set_list_ft else set()
234
        vals = val.split('|') # Use a pipe to separate entries
235
        num_vals = len(vals)
236
        assert num_vals >= qty_min, \
237
            "FIELD({F}): MIN QUANTITY({Q}) WASN'T MET: {V}".format(F=name, Q=qty_min, V=vals)
238
        if qty_max is not None:
239
            assert num_vals <= qty_max, \
240
                "FIELD({F}): MAX QUANTITY({Q}) EXCEEDED: {V}".format(F=name, Q=qty_max, V=vals)
241
        return vals if set_list_ft else set(vals)
242
243
    def _chk_qualifier(self, qualifiers):
244
        """Check that qualifiers are expected values."""
245
        # http://geneontology.org/page/go-annotation-conventions#qual
246
        for qual in qualifiers:
247
            assert qual in self.exp_qualifiers, "UNEXPECTED QUALIFIER({Q})".format(Q=qual)
248
249
    def prt_line_detail(self, prt, line):
250
        """Print line header and values in a readable format."""
251
        values = self.split_line(line)
252
        self._prt_line_detail(prt, values)
253
254
    def _prt_line_detail(self, prt, values, lnum=""):
255
        """Print header and field values in a readable format."""
256
        data = zip(self.req_str, self.ntgafobj._fields, values)
257
        txt = ["{:2}) {:3} {:13} {}".format(i, req, hdr, val) for i, (req, hdr, val) in enumerate(data)]
258
        prt.write("{LNUM}\n{TXT}\n".format(LNUM=lnum, TXT="\n".join(txt)))
259
260
    def _chk_qty_eq_1(self, flds):
261
        """Check that these fields have only one value: required 1."""
262
        for col in self.req1:
263
            if not flds[col]:
264
                # sys.stderr.write("**ERROR: UNEXPECTED REQUIRED VAL({V}) FOR COL({R}):{H}: ".format(
265
                #     V=flds[col], H=self.gafhdr[col], R=col))
266
                # sys.stderr.write("{H0}({DB}) {H1}({ID})\n".format(
267
                #     H0=self.gafhdr[0], DB=flds[0], H1=self.gafhdr[1], ID=flds[1]))
268
                return False  # Check failed
269
        return True  # Check passed
270
271
    @staticmethod
272
    def _do_taxons(taxons):
273
        """Taxon"""
274
        taxons = [int(v[6:]) for v in taxons] # strip "taxon:"
275
        num_taxons = len(taxons)
276
        assert num_taxons == 1 or num_taxons == 2
277
        return taxons
278
279
280
class GafHdr(object):
281
    """Used to build a GAF header."""
282
283
    cmpline = re.compile(r'^!(\w[\w\s-]+:.*)$')
284
285
    def __init__(self):
286
        self.gafhdr = []
287
288
    def get_hdr(self):
289
        """Return GAF header data as a string paragragh."""
290
        return "\n".join(self.gafhdr)
291
292
    def chkaddhdr(self, line):
293
        """If this line contains desired header info, save it."""
294
        mtch = self.cmpline.search(line)
295
        if mtch:
296
            self.gafhdr.append(mtch.group(1))
297
298
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang. All rights reserved."
299