Completed
Push — master ( 074cc7...987304 )
by Gonzalo
59s
created

get_files()   F

Complexity

Conditions 17

Size

Total Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
dl 0
loc 38
rs 2.7204
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like get_files() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright (c) 2016 Continuum Analytics, Inc.
4
#
5
# Licensed under the terms of the MIT License
6
# (see LICENSE.txt for details)
7
# -----------------------------------------------------------------------------
8
"""Utilities module."""
9
10
from __future__ import absolute_import, print_function
11
12
# Standard library imports
13
from collections import OrderedDict
14
from copy import deepcopy
15
import codecs
16
import cProfile
17
import difflib
18
import errno
19
import os
20
import pstats
21
import subprocess
22
import sys
23
import uuid
24
25
# Third party imports
26
from six.moves import cStringIO as StringIO
27
28
# Local imports
29
from ciocheck.config import DEFAULT_IGNORE_EXTENSIONS, DEFAULT_IGNORE_FOLDERS
30
31
32
class Profiler(object):
33
    """Context manager profiler."""
34
35
    def __init__(self):
36
        """Context manager profiler."""
37
        self._profiler = cProfile.Profile()
38
39
    def __enter__(self):
40
        """Enable profiler."""
41
        self._profiler.enable()
42
43
    def __exit__(self, type_, value, traceback):
44
        """Disable profiler and print stats."""
45
        self._profiler.disable()
46
        profile_stat = pstats.Stats(
47
            self._profiler, stream=sys.stdout).sort_stats('cumulative')
48
        profile_stat.print_stats()
49
50
51
class ShortOutput(object):
52
    """Context manager for capturing and formating stdout and stderr."""
53
54
    def __init__(self, root):
55
        """Context manager for capturing and formating stdout and stderr.
56
57
        Parameter
58
        ---------
59
        root : str
60
            Path where ciocheck script was called (root directory).
61
        """
62
        self._root = root
63
        self._stdout = None
64
        self._stderr = None
65
        self._stringio_output = None
66
        self._stringio_error = None
67
68
    def __enter__(self):
69
        """Capture stdout and stderr in a StringIO."""
70
        self._stdout = sys.stdout
71
        self._stderr = sys.stderr
72
        sys.stdout = self._stringio_output = StringIO()
73
        sys.stderr = self._stringio_error = StringIO()
74
        return self
75
76
    def __exit__(self, *args):
77
        """Restore stdout and stderr and format found values."""
78
        out = self._stringio_output.getvalue().splitlines()
79
        err = self._stringio_error.getvalue().splitlines()
80
        sys.stdout = self._stdout
81
        sys.stderr = self._stderr
82
83
        self.output = out
84
        self.error = err
85
        for output in [out, err]:
86
            for line in output:
87
                print(line)
88
89
90
def run_command(args, cwd=None):
91
    """Run command."""
92
    process = subprocess.Popen(
93
        args,
94
        stdout=subprocess.PIPE,
95
        stderr=subprocess.PIPE,
96
        cwd=cwd, )
97
    output, error = process.communicate()
98
99
    if isinstance(output, bytes):
100
        output = output.decode()
101
    if isinstance(error, bytes):
102
        error = error.decode()
103
104
    return output, error
105
106
107
def get_files(paths,
108
              exts=(),
109
              ignore_exts=DEFAULT_IGNORE_EXTENSIONS,
110
              ignore_folders=DEFAULT_IGNORE_FOLDERS):
111
    """Return all files matching the defined conditions."""
112
    all_files = []
113
    for path in paths:
114
        if os.path.isfile(path):
115
            all_files.append(path)
116
        else:
117
            for root, folders, files in os.walk(path):
118
                # Chop out hidden directories
119
                new_folders = []
120
                for folder in folders:
121
                    if folder[0] != '.':
122
                        tests = [
123
                            folder != ignore_folder
124
                            for ignore_folder in ignore_folders
125
                        ]
126
                        if all(tests):
127
                            new_folders.append(folder)
128
                folders[:] = new_folders
129
130
                # Chop out hidden files and walk files
131
                files = [f for f in files if f[0] != '.']
132
                for file in files:
133
                    tests, pass_tests = [True], [True]
134
                    if ignore_exts:
135
                        tests = [
136
                            not file.endswith('.' + ext) for ext in ignore_exts
137
                        ]
138
                    if exts:
139
                        pass_tests = [file.endswith('.' + ext) for ext in exts]
140
141
                    if all(tests) and any(pass_tests):
142
                        all_files.append(os.path.join(root, file))
143
144
    return list(sorted(all_files))
145
146
147
def filter_files(files, extensions):
148
    """Filter files based on a list of extensions."""
149
    copy_of_files = deepcopy(files)
150
    for file in files:
151
        if extensions:
152
            tests = [file.endswith('.' + ext) for ext in extensions]
153
        else:
154
            tests = [True]
155
156
        if not any(tests):
157
            if isinstance(files, dict):
158
                copy_of_files.pop(file)
159
            else:
160
                copy_of_files.remove(file)
161
    return copy_of_files
162
163
164
def _rename_over_existing(src, dest):
165
    try:
166
        # On Windows, this will throw EEXIST, on Linux it won't.
167
        os.rename(src, dest)
168
    except IOError as err:
169
        if err.errno == errno.EEXIST:
170
            # Clearly this song-and-dance is not in fact atomic,
171
            # but if something goes wrong putting the new file in
172
            # place at least the backup file might still be
173
            # around.
174
            backup = "{0}.bak-{1}".format(dest, str(uuid.uuid4()))
175
            os.rename(dest, backup)
176
            try:
177
                os.rename(src, dest)
178
            except Exception as err:
179
                os.rename(backup, dest)
180
                raise err
181
            finally:
182
                try:
183
                    os.remove(backup)
184
                except Exception as err:
185
                    pass
186
187
188
def atomic_replace(path, contents, encoding):
189
    """Try to do an atomic replace."""
190
    tmp = "{0}tmp-{1}".format(path, str(uuid.uuid4()))
191
    try:
192
        with codecs.open(tmp, 'w', encoding) as file_obj:
193
            file_obj.write(contents)
194
            file_obj.flush()
195
            file_obj.close()
196
        _rename_over_existing(tmp, path)
197
    finally:
198
        try:
199
            os.remove(tmp)
200
        except (IOError, OSError):
201
            pass
202
203
204
def diff(string_a, string_b):
205
    """Return unified diff of strings."""
206
    string_a = string_a.splitlines(1)
207
    string_b = string_b.splitlines(1)
208
    result = difflib.unified_diff(string_a, string_b)
209
    return ''.join(result)
210
211
212
def cpu_count():
213
    """Return the cpu count."""
214
    try:
215
        import multiprocessing
216
        count = multiprocessing.cpu_count()
217
    except Exception:
218
        print("Using fallback CPU count", file=sys.stderr)
219
        count = 4
220
    return count
221
222
223
def make_sorted_dict(dic):
224
    """Turn a dict into an ordered dict by sorting the keys."""
225
    ordered_results = OrderedDict()
226
    for key in sorted(dic.keys()):
227
        ordered_results[key] = dic[key]
228
    return ordered_results
229
230
231
def test():
232
    """Main local test."""
233
    paths = [os.path.dirname(os.path.realpath(__file__))]
234
    files = get_files(paths, exts=('py', ))
235
    for file in files:
236
        print(file)
237
238
239
if __name__ == '__main__':
240
    test()
241