nopen()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 69

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
dl 0
loc 69
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like nopen() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utilities used in Gene Ontology Enrichment Analyses."""
2
# Stolen from brentp:
3
# <https://github.com/brentp/toolshed/blob/master/toolshed/files.py>
4
5
import os
6
import os.path as op
7
import sys
8
import bz2
9
import gzip
10
import urllib
11
import wget
12
13
14
if sys.version_info[0] < 3:
15
    int_types = (int, long)
16
    urlopen = urllib.urlopen
17
else:
18
    int_types = (int,)
19
    basestring = str
20
    from urllib.request import urlopen
21
22
23
def nopen(f, mode="r"):
24
    r"""
25
    open a file that's gzipped or return stdin for '-'
26
    if f is a number, the result of nopen(sys.argv[f]) is returned.
27
    >>> nopen('-') == sys.stdin, nopen('-', 'w') == sys.stdout
28
    (True, True)
29
    >>> nopen(sys.argv[0])
30
    <...file...>
31
    # expands user and vars ($HOME)
32
    >>> nopen("~/.bashrc").name == nopen("$HOME/.bashrc").name
33
    True
34
    # an already open file.
35
    >>> nopen(open(sys.argv[0]))
36
    <...file...>
37
    >>> nopen(0)
38
    <...file...>
39
    Or provide nicer access to Popen.stdout
40
    >>> files = list(nopen("|ls"))
41
    >>> assert 'setup.py\n' in files or b'setup.py\n' in files, files
42
    """
43
    if isinstance(f, int_types):
44
        return nopen(sys.argv[f], mode)
45
46
    if not isinstance(f, basestring):
47
        return f
48
    if f.startswith("|"):
49
        # using shell explicitly makes things like process substitution work:
50
        # http://stackoverflow.com/questions/7407667/python-subprocess-subshells-and-redirection
51
        # use sys.stderr so we dont have to worry about checking it...
52
        p = Popen(f[1:], stdout=PIPE, stdin=PIPE,
53
                  stderr=sys.stderr if mode == "r" else PIPE,
54
                  shell=True, bufsize=-1, # use system default for buffering
55
                  preexec_fn=prefunc,
56
                  close_fds=False, executable=os.environ.get('SHELL'))
57
        if sys.version_info[0] > 2:
58
            import io
59
            p.stdout = io.TextIOWrapper(p.stdout)
60
            p.stdin = io.TextIOWrapper(p.stdin)
61
            if mode != "r":
62
                p.stderr = io.TextIOWrapper(p.stderr)
63
64
        if mode and mode[0] == "r":
65
            return process_iter(p, f[1:])
66
        return p
67
68
    if f.startswith(("http://", "https://", "ftp://")):
69
        fh = urlopen(f)
70
        if f.endswith(".gz"):
71
            return ungzipper(fh)
72
        if sys.version_info[0] < 3:
73
            return fh
74
        import io
75
        return io.TextIOWrapper(fh)
76
    f = op.expanduser(op.expandvars(f))
77
    if f.endswith((".gz", ".Z", ".z")):
78
        fh = gzip.open(f, mode)
79
        if sys.version_info[0] < 3:
80
            return fh
81
        import io
82
        return io.TextIOWrapper(fh)
83
    elif f.endswith((".bz", ".bz2", ".bzip2")):
84
        fh = bz2.BZ2File(f, mode)
85
        if sys.version_info[0] < 3:
86
            return fh
87
        import io
88
        return io.TextIOWrapper(fh)
89
90
    return {"r": sys.stdin, "w": sys.stdout}[mode[0]] if f == "-" \
91
         else open(f, mode)
92
93
94
def ungzipper(fh, blocksize=16384):
95
    """
96
    work-around to get streaming download of http://.../some.gz
97
    """
98
    import zlib
99
    uzip = zlib.decompressobj(16 + zlib.MAX_WBITS)
100
    data = uzip.decompress(fh.read(blocksize)).split("\n")
101
102
    while len(data[0]):
103
        # last chunk might not be a full line.
104
        save = data.pop()
105
        for line in data:
106
            yield line
107
        data = uzip.decompress(fh.read(blocksize)).split("\n")
108
        # first line is prepended with saved chunk from end of last set.
109
        data[0] = save + data[0]
110
111
112
def download_go_basic_obo(obo="go-basic.obo", prt=sys.stdout, loading_bar=True):
113
    """Download Ontologies, if necessary."""
114
    if not os.path.isfile(obo):
115
        http = "http://purl.obolibrary.org/obo/go"
116
        if "slim" in obo:
117
            http = "http://www.geneontology.org/ontology/subsets"
118
        obo_remote = "{HTTP}/{OBO}".format(HTTP=http, OBO=os.path.basename(obo))
119
        dnld_file(obo_remote, obo, prt, loading_bar)
120
    else:
121
        if prt is not None:
122
            prt.write("  EXISTS: {FILE}\n".format(FILE=obo))
123
    return obo
124
125
def download_ncbi_associations(gene2go="gene2go", prt=sys.stdout, loading_bar=True):
126
    """Download associations from NCBI, if necessary"""
127
    # Download: ftp://ftp.ncbi.nlm.nih.gov/gene/DATA/gene2go.gz
128
    gzip_file = "{GENE2GO}.gz".format(GENE2GO=gene2go)
129
    if not os.path.isfile(gene2go):
130
        file_remote = "ftp://ftp.ncbi.nlm.nih.gov/gene/DATA/{GZ}".format(
131
            GZ=os.path.basename(gzip_file))
132
        dnld_file(file_remote, gene2go, prt, loading_bar)
133
    else:
134
        if prt is not None:
135
            prt.write("  EXISTS: {FILE}\n".format(FILE=gene2go))
136
    return gene2go
137
138
def gunzip(gzip_file, file_gunzip=None):
139
    """Unzip .gz file. Return filename of unzipped file."""
140
    if file_gunzip is None:
141
        file_gunzip = os.path.splitext(gzip_file)[0]
142
        gzip_open_to(gzip_file, file_gunzip)
143
        return file_gunzip
144
145
def get_godag(fin_obo="go-basic.obo", prt=sys.stdout, loading_bar=True, optional_attrs=None):
146
    """Return GODag object. Initialize, if necessary."""
147
    from goatools.obo_parser import GODag
148
    download_go_basic_obo(fin_obo, prt, loading_bar)
149
    return GODag(fin_obo, optional_attrs, load_obsolete=False, prt=prt)
150
151
def get_gaf_name(species):
152
    """Given a species (eg goa_human, mgi, fb), return filename of GAF file."""
153
    gaf_pats = {
154
        'gas':"gene_association.{S}",
155
        'goa':"{S}.gaf"}
156
    # Example species text: goa_human mgi fb
157
    gaf_key = 'goa' if species[:4] == "goa_" else 'gas'
158
    # Return Examples: goa_human.gaf gene_association.mgi gene_association.fb
159
    return gaf_pats[gaf_key].format(S=species)
160
161
def dnld_gaf(species_txt, prt=sys.stdout, loading_bar=True):
162
    """Download GAF file if necessary."""
163
    return dnld_gafs([species_txt], prt, loading_bar)[0]
164
165
def dnld_gafs(species_list, prt=sys.stdout, loading_bar=True):
166
    """Download GAF files if necessary."""
167
    # Example GAF files:
168
    #   http://geneontology.org/gene-associations/gene_association.mgi.gz
169
    #   http://geneontology.org/gene-associations/gene_association.fb.gz
170
    #   http://geneontology.org/gene-associations/goa_human.gaf.gz
171
    #   NA: http://geneontology.org/gene-associations/gene_association.goa_human.gz
172
    http = "http://geneontology.org/gene-associations"
173
    # There are two filename patterns for gene associations on geneontology.org
174
    fin_gafs = []
175
    cwd = os.getcwd()
176
    for species_txt in species_list: # e.g., goa_human mgi fb
177
        gaf_base = get_gaf_name(species_txt) # goa_human.gaf
178
        gaf_cwd = os.path.join(cwd, gaf_base) # {CWD}/goa_human.gaf
179
        wget_cmd = "{HTTP}/{GAF}.gz".format(HTTP=http, GAF=gaf_base)
180
        dnld_file(wget_cmd, gaf_cwd, prt, loading_bar)
181
        fin_gafs.append(gaf_cwd)
182
    return fin_gafs
183
184
def dnld_file(src_ftp, dst_file, prt=sys.stdout, loading_bar=True):
185
    """Download specified file if necessary."""
186
    if os.path.isfile(dst_file):
187
        return
188
    do_gunzip = src_ftp[-3:] == '.gz' and dst_file[-3:] != '.gz'
189
    dst_wget = "{DST}.gz".format(DST=dst_file) if do_gunzip else dst_file
190
    # Write to stderr, not stdout so this message will be seen when running nosetests
191
    wget_msg = "wget.download({SRC} out={DST})\n".format(SRC=src_ftp, DST=dst_wget)
192
    sys.stderr.write("  {WGET}".format(WGET=wget_msg))
193
    if loading_bar:
194
        loading_bar = wget.bar_adaptive
195
    try:
196
        wget.download(src_ftp, out=dst_wget, bar=loading_bar)
197
        if do_gunzip:
198
            if prt is not None:
199
                prt.write("  gunzip {FILE}\n".format(FILE=dst_wget))
200
            gzip_open_to(dst_wget, dst_file)
201
    except IOError as errmsg:
202
        import traceback
203
        traceback.print_exc()
204
        sys.stderr.write("**FATAL cmd: {WGET}".format(WGET=wget_msg))
205
        sys.stderr.write("**FATAL msg: {ERR}".format(ERR=str(errmsg)))
206
        sys.exit(1)
207
208
def gzip_open_to(fin_gz, fout):
209
    """Unzip a file.gz file."""
210
    with gzip.open(fin_gz, 'rb') as zstrm:
211
        with  open(fout, 'wb') as ostrm:
212
            ostrm.write(zstrm.read())
213
    assert os.path.isfile(fout), "COULD NOT GUNZIP({G}) TO FILE({F})".format(G=fin_gz, F=fout)
214
    os.remove(fin_gz)
215
216
# Copyright (C) 2013-2018, B Pedersen, et al. All rights reserved."
217