doorstop.common.delete_contents()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 13
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 13
rs 9.9
c 0
b 0
f 0
ccs 0
cts 0
cp 0
cc 4
nop 1
crap 20
1
# SPDX-License-Identifier: LGPL-3.0-only
2
3 1
"""Common exceptions, classes, and functions for Doorstop."""
4 1
5 1
import argparse
6 1
import glob
7 1
import logging
8
import os
9 1
import shutil
10
11 1
import yaml
12 1
13 1
verbosity = 0  # global verbosity setting for controlling string formatting
14 1
PRINT_VERBOSITY = 0  # minimum verbosity to using `print`
15
STR_VERBOSITY = 3  # minimum verbosity to use verbose `__str__`
16
MAX_VERBOSITY = 4  # maximum verbosity level implemented
17
18
19
def _trace(self, message, *args, **kws):
20
    if self.isEnabledFor(logging.DEBUG - 1):
21
        self._log(logging.DEBUG - 1, message, args, **kws)  # pylint: disable=W0212
22
23 1
24 1
logging.addLevelName(logging.DEBUG - 1, "TRACE")
25
logging.Logger.trace = _trace  # type: ignore
26 1
27 1
logger = logging.getLogger
28
log = logger(__name__)
29
30
# exception classes ##########################################################
31
32
33 1
class DoorstopError(Exception):
34
    """Generic Doorstop error."""
35
36
37 1
class DoorstopFileError(DoorstopError, IOError):
38
    """Raised on IO errors."""
39
40
41 1
class DoorstopWarning(DoorstopError, Warning):
42
    """Generic Doorstop warning."""
43
44
45 1
class DoorstopInfo(DoorstopWarning, Warning):
46
    """Generic Doorstop info."""
47
48
49
# logging classes ############################################################
50
51 1
52
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
53
    """Command-line help text formatter with wider help text."""
54 1
55 1
    def __init__(self, *args, **kwargs):
56
        kwargs['max_help_position'] = 40
57
        super().__init__(*args, **kwargs)
58 1
59
60
class WarningFormatter(logging.Formatter):
61 1
    """Logging formatter that displays verbose formatting for WARNING+."""
62 1
63 1
    def __init__(self, default_format, verbose_format, *args, **kwargs):
64 1
        super().__init__(*args, **kwargs)
65
        self.default_format = default_format
66
        self.verbose_format = verbose_format
67
68
    def format(self, record):
69
        """Python 3 hack to change the formatting style dynamically."""
70
        if record.levelno > logging.INFO:
71
            self._style._fmt = self.verbose_format  # pylint: disable=W0212
72
        else:
73
            self._style._fmt = self.default_format  # pylint: disable=W0212
74
        return super().format(record)
75
76
77
# disk helper functions ######################################################
78 1
79
80 1
def create_dirname(path):
81 1
    """Ensure a parent directory exists for a path."""
82 1
    dirpath = os.path.dirname(path)
83 1
    if dirpath and not os.path.isdir(dirpath):
84
        log.info("creating directory {}...".format(dirpath))
85
        os.makedirs(dirpath)
86 1
87
88
def read_lines(path, encoding='utf-8'):
89
    """Read lines of text from a file.
90
91
    :param path: file to write lines
92
    :param encoding: output file encoding
93
94
    :return: path of new file
95 1
96 1
    """
97 1
    log.trace("reading lines from '{}'...".format(path))  # type: ignore
98 1
    with open(path, 'r', encoding=encoding) as stream:
99
        for line in stream:
100
            yield line
101 1
102
103
def read_text(path, encoding='utf-8'):
104
    """Read text from a file.
105
106
    :param path: file path to read from
107
    :param encoding: input file encoding
108
109
    :return: string
110 1
111 1
    """
112 1
    log.trace("reading text from '{}'...".format(path))  # type: ignore
113 1
    try:
114
        with open(path, 'r', encoding=encoding) as stream:
115
            return stream.read()
116 1
    except Exception as ex:
117
        msg = "reading '{}' failed: {}".format(path, ex)
118
        raise DoorstopError(msg)
119
120
121
def load_yaml(text, path, loader=yaml.SafeLoader):
122
    """Parse a dictionary from YAML text.
123
124
    :param text: string containing dumped YAML data
125
    :param path: file path for error messages
126 1
127 1
    :return: dictionary
128 1
129 1
    """
130 1
    # Load the YAML data
131
    try:
132 1
        data = yaml.load(text, Loader=loader) or {}
133 1
    except yaml.error.YAMLError as exc:
134 1
        msg = "invalid contents: {}:\n{}".format(path, exc)
135
        raise DoorstopError(msg) from None
136 1
    # Ensure data is a dictionary
137
    if not isinstance(data, dict):
138
        msg = "invalid contents: {}".format(path)
139 1
        raise DoorstopError(msg)
140
    # Return the parsed data
141
    return data
142
143
144
def write_lines(lines, path, end='\n', encoding='utf-8'):
145
    """Write lines of text to a file.
146
147
    :param lines: iterator of strings
148
    :param path: file to write lines
149
    :param end: string to end lines
150 1
    :param encoding: output file encoding
151 1
152 1
    :return: path of new file
153 1
154 1
    """
155 1
    log.trace("writing lines to '{}'...".format(path))  # type: ignore
156
    with open(path, 'wb') as stream:
157
        for line in lines:
158 1
            data = (line + end).encode(encoding)
159
            stream.write(data)
160
    return path
161
162
163
def write_text(text, path, encoding='utf-8'):
164
    """Write text to a file.
165
166
    :param text: string
167
    :param path: file to write text
168 1
    :param encoding: output file encoding
169 1
170 1
    :return: path of new file
171 1
172 1
    """
173 1
    if text:
174
        log.trace("writing text to '{}'...".format(path))  # type: ignore
175
    with open(path, 'wb') as stream:
176
        data = text.encode(encoding)
177
        stream.write(data)
178
    return path
179
180
181
def touch(path):
182
    """Ensure a file exists."""
183 1
    if not os.path.exists(path):
184
        log.trace("creating empty '{}'...".format(path))  # type: ignore
185 1
        write_text('', path)
186 1
187 1
188 1
def copy_dir_contents(src, dst):
189
    """Copy the contents of a directory."""
190 1
    for fpath in glob.glob('{}/*'.format(src)):
191
        dest_path = os.path.join(dst, os.path.split(fpath)[-1])
192
        if os.path.exists(dest_path):
193
            if os.path.basename(fpath) == "doorstop":
194
                msg = "Skipping '{}' as this directory name is required by doorstop".format(
195
                    fpath
196
                )
197
            else:
198
                msg = "Skipping '{}' as a file or directory with this name already exists".format(
199
                    fpath
200
                )
201
            log.warning(msg)
202
        else:
203
            if os.path.isdir(fpath):
204
                shutil.copytree(fpath, dest_path)
205
            else:
206
                shutil.copyfile(fpath, dest_path)
207
208 1
209
def delete(path):
210 1
    """Delete a file or directory with error handling."""
211 1
    if os.path.isdir(path):
212
        try:
213
            log.trace("deleting '{}'...".format(path))  # type: ignore
214 1
            shutil.rmtree(path)
215 1
        except IOError:
216
            # bug: http://code.activestate.com/lists/python-list/159050
217
            msg = "unable to delete: {}".format(path)
218
            log.warning(msg)
219
    elif os.path.isfile(path):
220
        log.trace("deleting '{}'...".format(path))  # type: ignore
221
        os.remove(path)
222
223
224
def delete_contents(dirname):
225
    """Delete the contents of a directory."""
226
    for file in glob.glob('{}/*'.format(dirname)):
227
        if os.path.isdir(file):
228
            shutil.rmtree(os.path.join(dirname, file))
229
        else:
230
            try:
231
                os.remove(os.path.join(dirname, file))
232
            except FileExistsError:
233
                log.warning(
234
                    "Two assets folders have files or directories " "with the same name"
235
                )
236
                raise
237