doorstop.common   F
last analyzed

Complexity

Total Complexity 63

Size/Duplication

Total Lines 364
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 63
eloc 187
dl 0
loc 364
rs 3.36
c 0
b 0
f 0

16 Functions

Rating   Name   Duplication   Size   Complexity  
A read_lines() 0 13 3
A write_csv() 0 18 3
A write_lines() 0 19 5
A read_text() 0 16 3
A dump_markdown() 0 12 3
A import_path_as_module() 0 7 1
A create_dirname() 0 6 3
A delete_contents() 0 13 4
A write_text() 0 16 3
A delete() 0 13 4
A copy_dir_contents() 0 19 5
A _trace() 0 3 2
A load_markdown() 0 25 3
D update_data_from_markdown_content() 0 42 12
A load_yaml() 0 21 3
A touch() 0 5 2

3 Methods

Rating   Name   Duplication   Size   Complexity  
A HelpFormatter.__init__() 0 3 1
A WarningFormatter.format() 0 7 2
A WarningFormatter.__init__() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like doorstop.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Common exceptions, classes, and functions for Doorstop."""
4
5
import argparse
6
import codecs
7
import csv
8
import glob
9
import io
10
import logging
11
import os
12
import re
13
import shutil
14
from importlib.abc import Loader
15
from importlib.machinery import ModuleSpec
16
from importlib.util import module_from_spec, spec_from_file_location
17
from pathlib import Path
18
from types import ModuleType
19
from typing import Union, cast
20
21
import frontmatter
22
import yaml
23
24
verbosity = 0  # global verbosity setting for controlling string formatting
25
PRINT_VERBOSITY = 0  # minimum verbosity to using `print`
26
STR_VERBOSITY = 3  # minimum verbosity to use verbose `__str__`
27
MAX_VERBOSITY = 4  # maximum verbosity level implemented
28
29
30
def _trace(self, message, *args, **kws):
31
    if self.isEnabledFor(logging.DEBUG - 1):
32
        self._log(logging.DEBUG - 1, message, args, **kws)  # pylint: disable=W0212
33
34
35
logging.addLevelName(logging.DEBUG - 1, "TRACE")
36
logging.Logger.trace = _trace  # type: ignore
37
38
logger = logging.getLogger
39
log = logger(__name__)
40
41
# exception classes ##########################################################
42
43
44
class DoorstopError(Exception):
45
    """Generic Doorstop error."""
46
47
48
class DoorstopFileError(DoorstopError, IOError):
49
    """Raised on IO errors."""
50
51
52
class DoorstopWarning(DoorstopError, Warning):
53
    """Generic Doorstop warning."""
54
55
56
class DoorstopInfo(DoorstopWarning, Warning):
57
    """Generic Doorstop info."""
58
59
60
# logging classes ############################################################
61
62
63
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
64
    """Command-line help text formatter with wider help text."""
65
66
    def __init__(self, *args, **kwargs):
67
        kwargs["max_help_position"] = 40
68
        super().__init__(*args, **kwargs)
69
70
71
class WarningFormatter(logging.Formatter):
72
    """Logging formatter that displays verbose formatting for WARNING+."""
73
74
    def __init__(self, default_format, verbose_format, *args, **kwargs):
75
        super().__init__(*args, **kwargs)
76
        self.default_format = default_format
77
        self.verbose_format = verbose_format
78
79
    def format(self, record):
80
        """Python 3 hack to change the formatting style dynamically."""
81
        if record.levelno > logging.INFO:
82
            self._style._fmt = self.verbose_format  # pylint: disable=W0212
83
        else:
84
            self._style._fmt = self.default_format  # pylint: disable=W0212
85
        return super().format(record)
86
87
88
# disk helper functions ######################################################
89
90
91
def create_dirname(path):
92
    """Ensure a parent directory exists for a path."""
93
    dirpath = os.path.dirname(path)
94
    if dirpath and not os.path.isdir(dirpath):
95
        log.info("creating directory {}...".format(dirpath))
96
        os.makedirs(dirpath)
97
98
99
def read_lines(path, encoding="utf-8"):
100
    """Read lines of text from a file.
101
102
    :param path: file to write lines
103
    :param encoding: output file encoding
104
105
    :return: path of new file
106
107
    """
108
    log.trace("reading lines from '{}'...".format(path))  # type: ignore
109
    with open(path, "r", encoding=encoding) as stream:
110
        for line in stream:
111
            yield line
112
113
114
def read_text(path):
115
    """Read text from a file.
116
117
    :param path: file path to read from
118
    :param encoding: input file encoding
119
120
    :return: string
121
122
    """
123
    log.trace("reading text from '{}'...".format(path))  # type: ignore
124
    try:
125
        with codecs.open(path, "r", encoding="utf-8") as f:
126
            return f.read()
127
    except Exception as e:
128
        msg = "reading '{}' failed: {}".format(path, e)
129
        raise DoorstopError(msg)
130
131
132
def load_yaml(text, path, loader=yaml.SafeLoader):
133
    """Parse a dictionary from YAML text.
134
135
    :param text: string containing dumped YAML data
136
    :param path: file path for error messages
137
138
    :return: dictionary
139
140
    """
141
    # Load the YAML data
142
    try:
143
        data = yaml.load(text, Loader=loader) or {}
144
    except yaml.error.YAMLError as exc:
145
        msg = "invalid contents: {}:\n{}".format(path, exc)
146
        raise DoorstopError(msg) from None
147
    # Ensure data is a dictionary
148
    if not isinstance(data, dict):
149
        msg = "invalid contents: {}".format(path)
150
        raise DoorstopError(msg)
151
    # Return the parsed data
152
    return data
153
154
155
def load_markdown(text, path, textattributekeys):
156
    """Parse a dictionary from Markdown file with YAML frontmatter.
157
158
    :param text: string containing markdown data with yaml frontmatter
159
    :param path: file path for error messages
160
161
    :return: dictionary
162
163
    """
164
    # Load YAML-frontmatter data from text
165
    try:
166
        data, content = frontmatter.parse(text, handler=frontmatter.YAMLHandler())
167
    except yaml.error.YAMLError as exc:
168
        msg = "invalid yaml contents: {}:\n{}".format(path, exc)
169
        raise DoorstopError(msg) from None
170
    # Ensure data is a dictionary
171
    if not isinstance(data, dict):
172
        msg = "invalid contents: {}".format(path)
173
        raise DoorstopError(msg)
174
175
    # parse content and update data dictionariy accordingly
176
    update_data_from_markdown_content(data, content, textattributekeys)
177
178
    # Return the parsed data
179
    return data
180
181
182
def write_lines(lines, path, end="\n", encoding="utf-8", *, executable=False):
183
    """Write lines of text to a file.
184
185
    :param lines: iterator of strings
186
    :param path: file to write lines
187
    :param end: string to end lines
188
    :param encoding: output file encoding
189
190
    :return: path of new file
191
192
    """
193
    log.trace("writing lines to '{}'...".format(path))  # type: ignore
194
    with open(path, "wb") as stream:
195
        for line in lines:
196
            data = (line + end).encode(encoding)
197
            stream.write(data)
198
    if executable and os.path.isfile(path):
199
        os.chmod(path, 0o775)
200
    return path
201
202
203
def write_text(text, path, end="\n"):
204
    """Write text to a file.
205
206
    :param text: string
207
    :param path: file to write text
208
    :param end: string to end lines
209
    :param encoding: output file encoding
210
211
    :return: path of new file
212
213
    """
214
    if text:
215
        log.trace("writing text to '{}'...".format(path))  # type: ignore
216
    with open(path, "w", encoding="utf-8", newline=end) as f:
217
        f.write(text)
218
    return path
219
220
221
def write_csv(table, path, delimiter=",", newline="", encoding="utf-8"):
222
    """Write table to a file.
223
224
    :param table: iterator of rows
225
    :param path: file to write lines
226
    :param delimiter: string to end cells
227
    :param newline: string to end lines
228
    :param encoding: output file encoding
229
230
    :return: path of new file
231
232
    """
233
    log.trace("writing table to '{}'...".format(path))  # type: ignore
234
    with open(path, "w", newline=newline, encoding=encoding) as stream:
235
        writer = csv.writer(stream, delimiter=delimiter)
236
        for row in table:
237
            writer.writerow(row)
238
    return path
239
240
241
def touch(path):
242
    """Ensure a file exists."""
243
    if not os.path.exists(path):
244
        log.trace("creating empty '{}'...".format(path))  # type: ignore
245
        write_text("", path)
246
247
248
def copy_dir_contents(src, dst):
249
    """Copy the contents of a directory."""
250
    for fpath in glob.glob("{}/*".format(src)):
251
        dest_path = os.path.join(dst, os.path.split(fpath)[-1])
252
        if os.path.exists(dest_path):
253
            if os.path.basename(fpath) == "doorstop":
254
                msg = "Skipping '{}' as this directory name is required by doorstop".format(
255
                    fpath
256
                )
257
            else:
258
                msg = "Skipping '{}' as a file or directory with this name already exists".format(
259
                    fpath
260
                )
261
            log.warning(msg)
262
        else:
263
            if os.path.isdir(fpath):
264
                shutil.copytree(fpath, dest_path)
265
            else:
266
                shutil.copyfile(fpath, dest_path)
267
268
269
def delete(path):
270
    """Delete a file or directory with error handling."""
271
    if os.path.isdir(path):
272
        try:
273
            log.trace("deleting '{}'...".format(path))  # type: ignore
274
            shutil.rmtree(path)
275
        except IOError:
276
            # bug: http://code.activestate.com/lists/python-list/159050
277
            msg = "unable to delete: {}".format(path)
278
            log.warning(msg)
279
    elif os.path.isfile(path):
280
        log.trace("deleting '{}'...".format(path))  # type: ignore
281
        os.remove(path)
282
283
284
def delete_contents(dirname):
285
    """Delete the contents of a directory."""
286
    for file in glob.glob("{}/*".format(dirname)):
287
        if os.path.isdir(file):
288
            shutil.rmtree(os.path.join(dirname, file))
289
        else:
290
            try:
291
                os.remove(os.path.join(dirname, file))
292
            except FileExistsError:
293
                log.warning(
294
                    "Two assets folders have files or directories with the same name"
295
                )
296
                raise
297
298
299
def update_data_from_markdown_content(data, content, textattributekeys):
300
    """Update data dictionary based on text content and attribute keys to look for within the content."""
301
    h1 = re.compile(r"^#{1}\s+(.*)")
302
    # for line based iteration
303
    s = io.StringIO(content)
304
    # final text
305
    header = None
306
    text = ""
307
308
    if "header" in textattributekeys:
309
        # search for first content line and check
310
        # if it is a h1 header
311
        for l in s:
312
            # skip empty lines
313
            if len(l.strip()) == 0:
314
                continue
315
            # check if first found line is a header
316
            m = h1.match(l.strip())
317
            if m:
318
                # header found
319
                header = m.group(1)
320
            else:
321
                # no header found, add to normal text
322
                text += l
323
            break
324
325
        # if header was found, skip empty lines before main text
326
        if header:
327
            for l in s:
328
                if len(l.strip()) != 0:
329
                    text += l
330
                    break
331
332
    # remaining content is normal text
333
    for l in s:
334
        text += l
335
336
    if "header" in textattributekeys and header:
337
        data["header"] = header
338
339
    if "text" in textattributekeys:
340
        data["text"] = text
341
342
343
def dump_markdown(data, textattr):
344
    content = ""
345
    if "header" in textattr and textattr["header"].strip() != "":
346
        content += "# {}\n".format(textattr["header"].strip())
347
        content += "\n"
348
349
    content += textattr["text"]
350
351
    text = frontmatter.dumps(
352
        frontmatter.Post(content, **data), Dumper=yaml.dumper.Dumper
353
    )
354
    return text
355
356
357
def import_path_as_module(path: Union[Path, str]) -> ModuleType:
358
    name = Path(path).stem
359
    spec = cast(ModuleSpec, spec_from_file_location(name, path))
360
    module = cast(ModuleType, module_from_spec(spec))
361
    loader = cast(Loader, spec.loader)
362
    loader.exec_module(module)
363
    return module
364