Passed
Push — develop ( c0df73...79f31e )
by Jace
01:48 queued 22s
created

doorstop.common   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 260
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 44
eloc 125
dl 0
loc 260
rs 8.8798
c 0
b 0
f 0

12 Functions

Rating   Name   Duplication   Size   Complexity  
A read_lines() 0 13 3
A read_text() 0 16 3
A create_dirname() 0 6 3
A _trace() 0 3 2
A load_yaml() 0 21 3
A write_csv() 0 18 3
A write_lines() 0 19 5
A delete_contents() 0 13 4
A write_text() 0 16 3
A delete() 0 13 4
A copy_dir_contents() 0 19 5
A touch() 0 5 2

3 Methods

Rating   Name   Duplication   Size   Complexity  
A HelpFormatter.__init__() 0 3 1
A WarningFormatter.format() 0 7 2
A WarningFormatter.__init__() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like doorstop.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Common exceptions, classes, and functions for Doorstop."""
4
5
import argparse
6
import csv
7
import glob
8
import logging
9
import os
10
import shutil
11
12
import yaml
13
14
verbosity = 0  # global verbosity setting for controlling string formatting
15
PRINT_VERBOSITY = 0  # minimum verbosity to using `print`
16
STR_VERBOSITY = 3  # minimum verbosity to use verbose `__str__`
17
MAX_VERBOSITY = 4  # maximum verbosity level implemented
18
19
20
def _trace(self, message, *args, **kws):
21
    if self.isEnabledFor(logging.DEBUG - 1):
22
        self._log(logging.DEBUG - 1, message, args, **kws)  # pylint: disable=W0212
23
24
25
logging.addLevelName(logging.DEBUG - 1, "TRACE")
26
logging.Logger.trace = _trace  # type: ignore
27
28
logger = logging.getLogger
29
log = logger(__name__)
30
31
# exception classes ##########################################################
32
33
34
class DoorstopError(Exception):
35
    """Generic Doorstop error."""
36
37
38
class DoorstopFileError(DoorstopError, IOError):
39
    """Raised on IO errors."""
40
41
42
class DoorstopWarning(DoorstopError, Warning):
43
    """Generic Doorstop warning."""
44
45
46
class DoorstopInfo(DoorstopWarning, Warning):
47
    """Generic Doorstop info."""
48
49
50
# logging classes ############################################################
51
52
53
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
54
    """Command-line help text formatter with wider help text."""
55
56
    def __init__(self, *args, **kwargs):
57
        kwargs["max_help_position"] = 40
58
        super().__init__(*args, **kwargs)
59
60
61
class WarningFormatter(logging.Formatter):
62
    """Logging formatter that displays verbose formatting for WARNING+."""
63
64
    def __init__(self, default_format, verbose_format, *args, **kwargs):
65
        super().__init__(*args, **kwargs)
66
        self.default_format = default_format
67
        self.verbose_format = verbose_format
68
69
    def format(self, record):
70
        """Python 3 hack to change the formatting style dynamically."""
71
        if record.levelno > logging.INFO:
72
            self._style._fmt = self.verbose_format  # pylint: disable=W0212
73
        else:
74
            self._style._fmt = self.default_format  # pylint: disable=W0212
75
        return super().format(record)
76
77
78
# disk helper functions ######################################################
79
80
81
def create_dirname(path):
82
    """Ensure a parent directory exists for a path."""
83
    dirpath = os.path.dirname(path)
84
    if dirpath and not os.path.isdir(dirpath):
85
        log.info("creating directory {}...".format(dirpath))
86
        os.makedirs(dirpath)
87
88
89
def read_lines(path, encoding="utf-8"):
90
    """Read lines of text from a file.
91
92
    :param path: file to write lines
93
    :param encoding: output file encoding
94
95
    :return: path of new file
96
97
    """
98
    log.trace("reading lines from '{}'...".format(path))  # type: ignore
99
    with open(path, "r", encoding=encoding) as stream:
100
        for line in stream:
101
            yield line
102
103
104
def read_text(path):
105
    """Read text from a file.
106
107
    :param path: file path to read from
108
    :param encoding: input file encoding
109
110
    :return: string
111
112
    """
113
    log.trace("reading text from '{}'...".format(path))  # type: ignore
114
    try:
115
        with open(path, "r", encoding="utf-8") as f:
116
            return f.read()
117
    except Exception as e:
118
        msg = "reading '{}' failed: {}".format(path, e)
119
        raise DoorstopError(msg)
120
121
122
def load_yaml(text, path, loader=yaml.SafeLoader):
123
    """Parse a dictionary from YAML text.
124
125
    :param text: string containing dumped YAML data
126
    :param path: file path for error messages
127
128
    :return: dictionary
129
130
    """
131
    # Load the YAML data
132
    try:
133
        data = yaml.load(text, Loader=loader) or {}
134
    except yaml.error.YAMLError as exc:
135
        msg = "invalid contents: {}:\n{}".format(path, exc)
136
        raise DoorstopError(msg) from None
137
    # Ensure data is a dictionary
138
    if not isinstance(data, dict):
139
        msg = "invalid contents: {}".format(path)
140
        raise DoorstopError(msg)
141
    # Return the parsed data
142
    return data
143
144
145
def write_lines(lines, path, end="\n", encoding="utf-8", *, executable=False):
146
    """Write lines of text to a file.
147
148
    :param lines: iterator of strings
149
    :param path: file to write lines
150
    :param end: string to end lines
151
    :param encoding: output file encoding
152
153
    :return: path of new file
154
155
    """
156
    log.trace("writing lines to '{}'...".format(path))  # type: ignore
157
    with open(path, "wb") as stream:
158
        for line in lines:
159
            data = (line + end).encode(encoding)
160
            stream.write(data)
161
    if executable and os.path.isfile(path):
162
        os.chmod(path, 0o775)
163
    return path
164
165
166
def write_text(text, path, end="\n"):
167
    """Write text to a file.
168
169
    :param text: string
170
    :param path: file to write text
171
    :param end: string to end lines
172
    :param encoding: output file encoding
173
174
    :return: path of new file
175
176
    """
177
    if text:
178
        log.trace("writing text to '{}'...".format(path))  # type: ignore
179
    with open(path, "w", encoding="utf-8", newline=end) as f:
180
        f.write(text)
181
    return path
182
183
184
def write_csv(table, path, delimiter=",", newline="", encoding="utf-8"):
185
    """Write table to a file.
186
187
    :param table: iterator of rows
188
    :param path: file to write lines
189
    :param delimiter: string to end cells
190
    :param newline: string to end lines
191
    :param encoding: output file encoding
192
193
    :return: path of new file
194
195
    """
196
    log.trace("writing table to '{}'...".format(path))  # type: ignore
197
    with open(path, "w", newline=newline, encoding=encoding) as stream:
198
        writer = csv.writer(stream, delimiter=delimiter)
199
        for row in table:
200
            writer.writerow(row)
201
    return path
202
203
204
def touch(path):
205
    """Ensure a file exists."""
206
    if not os.path.exists(path):
207
        log.trace("creating empty '{}'...".format(path))  # type: ignore
208
        write_text("", path)
209
210
211
def copy_dir_contents(src, dst):
212
    """Copy the contents of a directory."""
213
    for fpath in glob.glob("{}/*".format(src)):
214
        dest_path = os.path.join(dst, os.path.split(fpath)[-1])
215
        if os.path.exists(dest_path):
216
            if os.path.basename(fpath) == "doorstop":
217
                msg = "Skipping '{}' as this directory name is required by doorstop".format(
218
                    fpath
219
                )
220
            else:
221
                msg = "Skipping '{}' as a file or directory with this name already exists".format(
222
                    fpath
223
                )
224
            log.warning(msg)
225
        else:
226
            if os.path.isdir(fpath):
227
                shutil.copytree(fpath, dest_path)
228
            else:
229
                shutil.copyfile(fpath, dest_path)
230
231
232
def delete(path):
233
    """Delete a file or directory with error handling."""
234
    if os.path.isdir(path):
235
        try:
236
            log.trace("deleting '{}'...".format(path))  # type: ignore
237
            shutil.rmtree(path)
238
        except IOError:
239
            # bug: http://code.activestate.com/lists/python-list/159050
240
            msg = "unable to delete: {}".format(path)
241
            log.warning(msg)
242
    elif os.path.isfile(path):
243
        log.trace("deleting '{}'...".format(path))  # type: ignore
244
        os.remove(path)
245
246
247
def delete_contents(dirname):
248
    """Delete the contents of a directory."""
249
    for file in glob.glob("{}/*".format(dirname)):
250
        if os.path.isdir(file):
251
            shutil.rmtree(os.path.join(dirname, file))
252
        else:
253
            try:
254
                os.remove(os.path.join(dirname, file))
255
            except FileExistsError:
256
                log.warning(
257
                    "Two assets folders have files or directories " "with the same name"
258
                )
259
                raise
260