prt_tsv()   D
last analyzed

Complexity

Conditions 13

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 13
dl 0
loc 25
rs 4.2
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like prt_tsv() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Contains generic table-writing functions. Data is expected to be a list of namedtuples.
2
3
       kwargs (kws):
4
           'title': First row will contain user-provided title string
5
           'prt_if': Only print a line if user-specfied test returns True.
6
               prt_if is a lambda function with the data item's namedtuple as input.
7
               Example: prt_if = lambda nt: nt.p_uncorrected < 0.05
8
           'sort_by' : User-customizable sort when printing.
9
               sortby is a lambda function with the data item's namedtuple as input.
10
               It is the 'key' used in the sorted function.
11
               Example: sort_by = lambda nt: [nt.NS, -1*nt.depth]
12
           'hdrs' : A list of column headers to use when printing the table.
13
               default: The fields in the data's namedtuple is used as the column headers.
14
           'sep': Separator used when printing the tab-separated table format.
15
               default: sep = '\t'
16
           'prt_flds' : Used to print a subset of the fields in the namedtuple or
17
               to control the order of the print fields
18
           'fld2col_widths: A dictionary of column widths used when writing xlsx files.
19
           'fld2fmt': Used in tsv files and xlsx files for formatting specific fields
20
21
           For adding color or other formatting to a row based on value in a row:
22
               'ntfld_wbfmt': namedtuple field containing a value used as a key for a xlsx format
23
               'ntval2wbfmtdict': namedtuple value and corresponding xlsx format dict. Examples:
24
"""
25
26
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
27
__author__ = "DV Klopfenstein"
28
29
import re
30
import sys
31
from goatools.wr_tbl_class import get_hdrs
32
33
def prt_txt(prt, data_nts, prtfmt=None, nt_fields=None, **kws):
34
    """Print list of namedtuples into a table using prtfmt."""
35
    lines = get_lines(data_nts, prtfmt, nt_fields, **kws)
36
    if lines:
37
        for line in lines:
38
            prt.write(line)
39
    else:
40
        sys.stdout.write("      0 items. NOT WRITING\n")
41
42
def get_lines(data_nts, prtfmt=None, nt_fields=None, **kws):
43
    """Print list of namedtuples into a table using prtfmt."""
44
    lines = []
45
    # optional keyword args: prt_if sort_by
46
    if prtfmt is None:
47
        prtfmt = mk_fmtfld(data_nts[0], kws.get('joinchr', ' '), kws.get('eol', '\n'))
48
    # if nt_fields arg is None, use fields from prtfmt string.
49
    if nt_fields is not None:
50
        _chk_flds_fmt(nt_fields, prtfmt)
51
    if 'sort_by' in kws:
52
        data_nts = sorted(data_nts, key=kws['sort_by'])
53
    prt_if = kws.get('prt_if', None)
54
    for data_nt in data_nts:
55
        if prt_if is None or prt_if(data_nt):
56
            lines.append(prtfmt.format(**data_nt._asdict()))
57
    return lines
58
59
def prt_nts(data_nts, prtfmt=None, prt=sys.stdout, nt_fields=None, **kws):
60
    """Print list of namedtuples into a table using prtfmt."""
61
    prt_txt(prt, data_nts, prtfmt, nt_fields, **kws)
62
63
def wr_xlsx(fout_xlsx, data_xlsx, **kws):
64
    """Write a spreadsheet into a xlsx file."""
65
    from goatools.wr_tbl_class import WrXlsx
66
    # optional keyword args: fld2col_widths hdrs prt_if sort_by fld2fmt prt_flds
67
    items_str = kws.get("items", "items") if "items" not in kws else kws["items"]
68
    if data_xlsx:
69
        # Open xlsx file
70
        xlsxobj = WrXlsx(fout_xlsx, data_xlsx[0]._fields, **kws)
71
        worksheet = xlsxobj.add_worksheet()
72
        # Write title (optional) and headers.
73
        row_idx = xlsxobj.wr_title(worksheet)
74
        row_idx = xlsxobj.wr_hdrs(worksheet, row_idx)
75
        row_idx_data0 = row_idx
76
        # Write data
77
        row_idx = xlsxobj.wr_data(data_xlsx, row_idx, worksheet)
78
        # Close xlsx file
79
        xlsxobj.workbook.close()
80
        sys.stdout.write("  {N:>5} {ITEMS} WROTE: {FOUT}\n".format(
81
            N=row_idx-row_idx_data0, ITEMS=items_str, FOUT=fout_xlsx))
82
    else:
83
        sys.stdout.write("      0 {ITEMS}. NOT WRITING {FOUT}\n".format(
84
            ITEMS=items_str, FOUT=fout_xlsx))
85
86
def wr_xlsx_sections(fout_xlsx, xlsx_data, **kws):
87
    """Write xlsx file containing section names followed by lines of namedtuple data."""
88
    from goatools.wr_tbl_class import WrXlsx
89
    items_str = "items" if "items" not in kws else kws["items"]
90
    prt_hdr_min = 10
91
    num_items = 0
92
    if xlsx_data:
93
        # Basic data checks
94
        assert len(xlsx_data[0]) == 2, "wr_xlsx_sections EXPECTED: [(section, nts), ..."
95
        assert xlsx_data[0][1], \
96
            "wr_xlsx_sections EXPECTED SECTION({S}) LIST TO HAVE DATA".format(S=xlsx_data[0][0])
97
        # Open xlsx file and write title (optional) and headers.
98
        xlsxobj = WrXlsx(fout_xlsx, xlsx_data[0][1][0]._fields, **kws)
99
        worksheet = xlsxobj.add_worksheet()
100
        row_idx = xlsxobj.wr_title(worksheet)
101
        hdrs_wrote = False
102
        # Write data
103
        for section_text, data_nts in xlsx_data:
104
            num_items += len(data_nts)
105
            fmt = xlsxobj.wbfmtobj.get_fmt_section()
106
            row_idx = xlsxobj.wr_row_mergeall(worksheet, section_text, fmt, row_idx)
107
            if hdrs_wrote is False or len(data_nts) > prt_hdr_min:
108
                row_idx = xlsxobj.wr_hdrs(worksheet, row_idx)
109
                hdrs_wrote = True
110
            row_idx = xlsxobj.wr_data(data_nts, row_idx, worksheet)
111
        # Close xlsx file
112
        xlsxobj.workbook.close()
113
        sys.stdout.write("  {N:>5} {ITEMS} WROTE: {FOUT} ({S} sections)\n".format(
114
            N=num_items, ITEMS=items_str, FOUT=fout_xlsx, S=len(xlsx_data)))
115
    else:
116
        sys.stdout.write("      0 {ITEMS}. NOT WRITING {FOUT}\n".format(
117
            ITEMS=items_str, FOUT=fout_xlsx))
118
119
def wr_tsv(fout_tsv, tsv_data, **kws):
120
    """Write a file of tab-separated table data"""
121
    items_str = "items" if "items" not in kws else kws["items"]
122
    if tsv_data:
123
        ifstrm = sys.stdout if fout_tsv is None else open(fout_tsv, 'w')
124
        num_items = prt_tsv(ifstrm, tsv_data, **kws)
125
        if fout_tsv is not None:
126
            sys.stdout.write("  {N:>5} {ITEMS} WROTE: {FOUT}\n".format(
127
                N=num_items, ITEMS=items_str, FOUT=fout_tsv))
128
            ifstrm.close()
129
    else:
130
        sys.stdout.write("      0 {ITEMS}. NOT WRITING {FOUT}\n".format(
131
            ITEMS=items_str, FOUT=fout_tsv))
132
133
def prt_tsv(prt, data_nts, **kws):
134
    """Print tab-separated table data"""
135
    # User-controlled printing options
136
    sep = "\t" if 'sep' not in kws else kws['sep']
137
    flds_all = data_nts[0]._fields
138
    hdrs = get_hdrs(flds_all, **kws)
139
    fld2fmt = None if 'fld2fmt' not in kws else kws['fld2fmt']
140
    if 'sort_by' in kws:
141
        data_nts = sorted(data_nts, key=kws['sort_by'])
142
    prt_if = kws['prt_if'] if 'prt_if' in kws else None
143
    prt_flds = kws['prt_flds'] if 'prt_flds' in kws else data_nts[0]._fields
144
    # Write header
145
    prt.write("# {}\n".format(sep.join(hdrs)))
146
    # Write data
147
    items = 0
148
    for nt_data_row in data_nts:
149
        if prt_if is None or prt_if(nt_data_row):
150
            if fld2fmt is not None:
151
                row_fld_vals = [(fld, getattr(nt_data_row, fld)) for fld in prt_flds]
152
                row_vals = _fmt_fields(row_fld_vals, fld2fmt)
153
            else:
154
                row_vals = [getattr(nt_data_row, fld) for fld in prt_flds]
155
            prt.write("{}\n".format(sep.join(str(d) for d in row_vals)))
156
            items += 1
157
    return items
158
159
def _fmt_fields(fld_vals, fld2fmt):
160
    """Optional user-formatting of specific fields, eg, pval: '{:8.2e}'."""
161
    vals = []
162
    for fld, val in fld_vals:
163
        if fld in fld2fmt:
164
            val = fld2fmt[fld].format(val)
165
        vals.append(val)
166
    return vals
167
168
def _chk_flds_fmt(nt_fields, prtfmt):
169
    """Check that all fields in the prtfmt have corresponding data in the namedtuple."""
170
    fmtflds = get_fmtflds(prtfmt)
171
    missing_data = set(fmtflds).difference(set(nt_fields))
172
    # All data needed for print is present, return.
173
    if not missing_data:
174
        return
175
    #raise Exception('MISSING DATA({M}).'.format(M=" ".join(missing_data)))
176
    msg = ['CANNOT PRINT USING: "{PF}"'.format(PF=prtfmt.rstrip())]
177
    for fld in fmtflds:
178
        errmrk = "" if fld in nt_fields else "ERROR-->"
179
        msg.append("  {ERR:8} {FLD}".format(ERR=errmrk, FLD=fld))
180
    raise Exception('\n'.join(msg))
181
182
def get_fmtflds(prtfmt):
183
    """Return the fieldnames in the formatter text."""
184
    # Example prtfmt: "{NS} {study_cnt:2} {fdr_bh:5.3e} L{level:02} D{depth:02} {GO} {name}\n"
185
    return [f.split(':')[0] for f in re.findall(r'{(\S+)}', prtfmt)]
186
187
def get_fmtfldsdict(prtfmt):
188
    """Return the fieldnames in the formatter text."""
189
    # Example prtfmt: "{NS} {study_cnt:2} {fdr_bh:5.3e} L{level:02} D{depth:02} {GO} {name}\n"
190
    return {v:v for v in get_fmtflds(prtfmt)}
191
192
def _prt_txt_hdr(prt, prtfmt):
193
    """Print header for text report."""
194
    tblhdrs = get_fmtfldsdict(prtfmt)
195
    # If needed, reformat for format_string for header, which has strings, not floats.
196
    hdrfmt = re.sub(r':(\d+)\.\S+}', r':\1}', prtfmt)
197
    hdrfmt = re.sub(r':(0+)(\d+)}', r':\2}', hdrfmt)
198
    prt.write("#{}".format(hdrfmt.format(**tblhdrs)))
199
200
def mk_fmtfld(nt_item, joinchr=" ", eol="\n"):
201
    """Given a namedtuple, return a format_field string."""
202
    fldstrs = []
203
    # Default formats based on fieldname
204
    fld2fmt = {
205
        'hdrgo' : lambda f: "{{{FLD}:1,}}".format(FLD=f),
206
        'dcnt' : lambda f: "{{{FLD}:6,}}".format(FLD=f),
207
        'level' : lambda f: "L{{{FLD}:02,}}".format(FLD=f),
208
        'depth' : lambda f: "D{{{FLD}:02,}}".format(FLD=f),
209
    }
210
    for fld in nt_item._fields:
211
        if fld in fld2fmt:
212
            val = fld2fmt[fld](fld)
213
        else:
214
            val = "{{{FLD}}}".format(FLD=fld)
215
        fldstrs.append(val)
216
    return "{LINE}{EOL}".format(LINE=joinchr.join(fldstrs), EOL=eol)
217
218
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang. All rights reserved.
219