Completed
Push — develop ( bff36c...0ce659 )
by Jace
15s queued 11s
created

doorstop.common.write_csv()   A

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 18
rs 10
c 0
b 0
f 0
cc 3
nop 5
1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Common exceptions, classes, and functions for Doorstop."""
4
5
import argparse
6
import csv
7
import glob
8
import logging
9
import os
10
import shutil
11
12
import yaml
13
14
verbosity = 0  # global verbosity setting for controlling string formatting
15
PRINT_VERBOSITY = 0  # minimum verbosity to using `print`
16
STR_VERBOSITY = 3  # minimum verbosity to use verbose `__str__`
17
MAX_VERBOSITY = 4  # maximum verbosity level implemented
18
19
20
def _trace(self, message, *args, **kws):
21
    if self.isEnabledFor(logging.DEBUG - 1):
22
        self._log(logging.DEBUG - 1, message, args, **kws)  # pylint: disable=W0212
23
24
25
logging.addLevelName(logging.DEBUG - 1, "TRACE")
26
logging.Logger.trace = _trace  # type: ignore
27
28
logger = logging.getLogger
29
log = logger(__name__)
30
31
# exception classes ##########################################################
32
33
34
class DoorstopError(Exception):
35
    """Generic Doorstop error."""
36
37
38
class DoorstopFileError(DoorstopError, IOError):
39
    """Raised on IO errors."""
40
41
42
class DoorstopWarning(DoorstopError, Warning):
43
    """Generic Doorstop warning."""
44
45
46
class DoorstopInfo(DoorstopWarning, Warning):
47
    """Generic Doorstop info."""
48
49
50
# logging classes ############################################################
51
52
53
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
54
    """Command-line help text formatter with wider help text."""
55
56
    def __init__(self, *args, **kwargs):
57
        kwargs['max_help_position'] = 40
58
        super().__init__(*args, **kwargs)
59
60
61
class WarningFormatter(logging.Formatter):
62
    """Logging formatter that displays verbose formatting for WARNING+."""
63
64
    def __init__(self, default_format, verbose_format, *args, **kwargs):
65
        super().__init__(*args, **kwargs)
66
        self.default_format = default_format
67
        self.verbose_format = verbose_format
68
69
    def format(self, record):
70
        """Python 3 hack to change the formatting style dynamically."""
71
        if record.levelno > logging.INFO:
72
            self._style._fmt = self.verbose_format  # pylint: disable=W0212
73
        else:
74
            self._style._fmt = self.default_format  # pylint: disable=W0212
75
        return super().format(record)
76
77
78
# disk helper functions ######################################################
79
80
81
def create_dirname(path):
82
    """Ensure a parent directory exists for a path."""
83
    dirpath = os.path.dirname(path)
84
    if dirpath and not os.path.isdir(dirpath):
85
        log.info("creating directory {}...".format(dirpath))
86
        os.makedirs(dirpath)
87
88
89
def read_lines(path, encoding='utf-8'):
90
    """Read lines of text from a file.
91
92
    :param path: file to write lines
93
    :param encoding: output file encoding
94
95
    :return: path of new file
96
97
    """
98
    log.trace("reading lines from '{}'...".format(path))  # type: ignore
99
    with open(path, 'r', encoding=encoding) as stream:
100
        for line in stream:
101
            yield line
102
103
104
def read_text(path):
105
    """Read text from a file.
106
107
    :param path: file path to read from
108
    :param encoding: input file encoding
109
110
    :return: string
111
112
    """
113
    log.trace("reading text from '{}'...".format(path))  # type: ignore
114
    try:
115
        with open(path, 'r', encoding='utf-8') as f:
116
            return f.read()
117
    except Exception as e:
118
        msg = "reading '{}' failed: {}".format(path, e)
119
        raise DoorstopError(msg)
120
121
122
def load_yaml(text, path, loader=yaml.SafeLoader):
123
    """Parse a dictionary from YAML text.
124
125
    :param text: string containing dumped YAML data
126
    :param path: file path for error messages
127
128
    :return: dictionary
129
130
    """
131
    # Load the YAML data
132
    try:
133
        data = yaml.load(text, Loader=loader) or {}
134
    except yaml.error.YAMLError as exc:
135
        msg = "invalid contents: {}:\n{}".format(path, exc)
136
        raise DoorstopError(msg) from None
137
    # Ensure data is a dictionary
138
    if not isinstance(data, dict):
139
        msg = "invalid contents: {}".format(path)
140
        raise DoorstopError(msg)
141
    # Return the parsed data
142
    return data
143
144
145
def write_lines(lines, path, end='\n', encoding='utf-8'):
146
    """Write lines of text to a file.
147
148
    :param lines: iterator of strings
149
    :param path: file to write lines
150
    :param end: string to end lines
151
    :param encoding: output file encoding
152
153
    :return: path of new file
154
155
    """
156
    log.trace("writing lines to '{}'...".format(path))  # type: ignore
157
    with open(path, 'wb') as stream:
158
        for line in lines:
159
            data = (line + end).encode(encoding)
160
            stream.write(data)
161
    return path
162
163
164
def write_text(text, path):
165
    """Write text to a file.
166
167
    :param text: string
168
    :param path: file to write text
169
    :param encoding: output file encoding
170
171
    :return: path of new file
172
173
    """
174
    if text:
175
        log.trace("writing text to '{}'...".format(path))  # type: ignore
176
    with open(path, 'w', encoding='utf-8') as f:
177
        f.write(text)
178
    return path
179
180
181
def write_csv(table, path, delimiter=',', newline='', encoding='utf-8'):
182
    """Write table to a file.
183
184
    :param table: iterator of rows
185
    :param path: file to write lines
186
    :param delimiter: string to end cells
187
    :param newline: string to end lines
188
    :param encoding: output file encoding
189
190
    :return: path of new file
191
192
    """
193
    log.trace("writing table to '{}'...".format(path))  # type: ignore
194
    with open(path, 'w', newline=newline, encoding=encoding) as stream:
195
        writer = csv.writer(stream, delimiter=delimiter)
196
        for row in table:
197
            writer.writerow(row)
198
    return path
199
200
201
def touch(path):
202
    """Ensure a file exists."""
203
    if not os.path.exists(path):
204
        log.trace("creating empty '{}'...".format(path))  # type: ignore
205
        write_text('', path)
206
207
208
def copy_dir_contents(src, dst):
209
    """Copy the contents of a directory."""
210
    for fpath in glob.glob('{}/*'.format(src)):
211
        dest_path = os.path.join(dst, os.path.split(fpath)[-1])
212
        if os.path.exists(dest_path):
213
            if os.path.basename(fpath) == "doorstop":
214
                msg = "Skipping '{}' as this directory name is required by doorstop".format(
215
                    fpath
216
                )
217
            else:
218
                msg = "Skipping '{}' as a file or directory with this name already exists".format(
219
                    fpath
220
                )
221
            log.warning(msg)
222
        else:
223
            if os.path.isdir(fpath):
224
                shutil.copytree(fpath, dest_path)
225
            else:
226
                shutil.copyfile(fpath, dest_path)
227
228
229
def delete(path):
230
    """Delete a file or directory with error handling."""
231
    if os.path.isdir(path):
232
        try:
233
            log.trace("deleting '{}'...".format(path))  # type: ignore
234
            shutil.rmtree(path)
235
        except IOError:
236
            # bug: http://code.activestate.com/lists/python-list/159050
237
            msg = "unable to delete: {}".format(path)
238
            log.warning(msg)
239
    elif os.path.isfile(path):
240
        log.trace("deleting '{}'...".format(path))  # type: ignore
241
        os.remove(path)
242
243
244
def delete_contents(dirname):
245
    """Delete the contents of a directory."""
246
    for file in glob.glob('{}/*'.format(dirname)):
247
        if os.path.isdir(file):
248
            shutil.rmtree(os.path.join(dirname, file))
249
        else:
250
            try:
251
                os.remove(os.path.join(dirname, file))
252
            except FileExistsError:
253
                log.warning(
254
                    "Two assets folders have files or directories " "with the same name"
255
                )
256
                raise
257