Completed
Push — develop ( c3c8a7...44d282 )
by Jace
13s
created

copy_dir_contents()   A

Complexity

Conditions 3

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 3
rs 9.4285
1
"""Common exceptions, classes, and functions for Doorstop."""
2
3 1
import os
4 1
import shutil
5 1
import argparse
6 1
import logging
7 1
import glob
8
9 1
import yaml
10
11 1
verbosity = 0  # global verbosity setting for controlling string formatting
12 1
PRINT_VERBOSITY = 0  # minimum verbosity to using `print`
13 1
STR_VERBOSITY = 3  # minimum verbosity to use verbose `__str__`
14 1
MAX_VERBOSITY = 4  # maximum verbosity level implemented
15
16
17
def _trace(self, message, *args, **kws):  # pragma: no cover (manual test)
18
    """New logging level, TRACE."""
19
    if self.isEnabledFor(logging.DEBUG - 1):
20
        self._log(logging.DEBUG - 1, message, args, **kws)  # pylint: disable=W0212
21
22
23 1
logging.addLevelName(logging.DEBUG - 1, "TRACE")
24 1
logging.Logger.trace = _trace
25
26 1
logger = logging.getLogger
27 1
log = logger(__name__)
28
29
30
# exception classes ##########################################################
31
32
33 1
class DoorstopError(Exception):
34
    """Generic Doorstop error."""
35
36
37 1
class DoorstopFileError(DoorstopError, IOError):
38
    """Raised on IO errors."""
39
40
41 1
class DoorstopWarning(DoorstopError, Warning):
42
    """Generic Doorstop warning."""
43
44
45 1
class DoorstopInfo(DoorstopWarning, Warning):
46
    """Generic Doorstop info."""
47
48
# logging classes ############################################################
49
50
51 1
class HelpFormatter(argparse.HelpFormatter):
52
    """Command-line help text formatter with wider help text."""
53
54 1
    def __init__(self, *args, **kwargs):
55 1
        super().__init__(*args, max_help_position=40, **kwargs)
56
57
58 1
class WarningFormatter(logging.Formatter, object):
59
    """Logging formatter that displays verbose formatting for WARNING+."""
60
61 1
    def __init__(self, default_format, verbose_format, *args, **kwargs):
62 1
        super().__init__(*args, **kwargs)
63 1
        self.default_format = default_format
64 1
        self.verbose_format = verbose_format
65
66
    def format(self, record):  # pragma: no cover (manual test)
67
        """Python 3 hack to change the formatting style dynamically."""
68
        if record.levelno > logging.INFO:
69
            self._style._fmt = self.verbose_format  # pylint: disable=W0212
70
        else:
71
            self._style._fmt = self.default_format  # pylint: disable=W0212
72
        return super().format(record)
73
74
75
# disk helper functions ######################################################
76
77
78 1
def create_dirname(path):
79
    """Ensure a parent directory exists for a path."""
80 1
    dirpath = os.path.dirname(path)
81 1
    if dirpath and not os.path.isdir(dirpath):
82 1
        log.info("creating directory {}...".format(dirpath))
0 ignored issues
show
introduced by
Use formatting in logging functions but pass the parameters as arguments
Loading history...
83 1
        os.makedirs(dirpath)
84
85
86 1
def read_lines(path, encoding='utf-8'):
87
    """Read lines of text from a file.
88
89
    :param path: file to write lines
90
    :param encoding: output file encoding
91
92
    :return: path of new file
93
94
    """
95 1
    log.trace("reading lines from '{}'...".format(path))
96 1
    with open(path, 'r', encoding=encoding) as stream:
97 1
        for line in stream:
98 1
            yield line
99
100
101 1
def read_text(path, encoding='utf-8'):
102
    """Read text from a file.
103
104
    :param path: file path to read from
105
    :param encoding: input file encoding
106
107
    :return: string
108
109
    """
110 1
    log.trace("reading text from '{}'...".format(path))
111 1
    with open(path, 'r', encoding=encoding) as stream:
112 1
        text = stream.read()
113 1
    return text
114
115
116 1
def load_yaml(text, path):
117
    """Parse a dictionary from YAML text.
118
119
    :param text: string containing dumped YAML data
120
    :param path: file path for error messages
121
122
    :return: dictionary
123
124
    """
125
    # Load the YAML data
126 1
    try:
127 1
        data = yaml.load(text) or {}
128 1
    except yaml.error.YAMLError as exc:
129 1
        msg = "invalid contents: {}:\n{}".format(path, exc)
130 1
        raise DoorstopError(msg) from None
131
    # Ensure data is a dictionary
132 1
    if not isinstance(data, dict):
133 1
        msg = "invalid contents: {}".format(path)
134 1
        raise DoorstopError(msg)
135
    # Return the parsed data
136 1
    return data
137
138
139 1
def write_lines(lines, path, end='\n', encoding='utf-8'):
140
    """Write lines of text to a file.
141
142
    :param lines: iterator of strings
143
    :param path: file to write lines
144
    :param end: string to end lines
145
    :param encoding: output file encoding
146
147
    :return: path of new file
148
149
    """
150 1
    log.trace("writing lines to '{}'...".format(path))
151 1
    with open(path, 'wb') as stream:
152 1
        for line in lines:
153 1
            data = (line + end).encode(encoding)
154 1
            stream.write(data)
155 1
    return path
156
157
158 1
def write_text(text, path, encoding='utf-8'):
159
    """Write text to a file.
160
161
    :param text: string
162
    :param path: file to write text
163
    :param encoding: output file encoding
164
165
    :return: path of new file
166
167
    """
168 1
    if text:
169 1
        log.trace("writing text to '{}'...".format(path))
170 1
    with open(path, 'wb') as stream:
171 1
        data = text.encode(encoding)
172 1
        stream.write(data)
173 1
    return path
174
175
176
def touch(path):  # pragma: no cover (integration test)
177
    """Ensure a file exists."""
178
    if not os.path.exists(path):
179
        log.trace("creating empty '{}'...".format(path))
180
        write_text('', path)
181
182
183 1
def copy_dir_contents(src, dst):
184
    """Copy the contents of a directory."""
185 1
    for fpath in glob.glob('{}/*'.format(src)):
186 1
        dest_path = os.path.join(dst, os.path.split(fpath)[-1])
187 1
        if os.path.isdir(fpath):
188 1
            shutil.copytree(fpath, dest_path)
189
        else:
190 1
            shutil.copyfile(fpath, dest_path)
191
192
193
def delete(path):  # pragma: no cover (integration test)
194
    """Delete a file or directory with error handling."""
195
    if os.path.isdir(path):
196
        try:
197
            log.trace("deleting '{}'...".format(path))
198
            shutil.rmtree(path)
199
        except IOError:
200
            # bug: http://code.activestate.com/lists/python-list/159050
201
            msg = "unable to delete: {}".format(path)
202
            log.warning(msg)
203
    elif os.path.isfile(path):
204
        log.trace("deleting '{}'...".format(path))
205
        os.remove(path)
206
207
208 1
def delete_contents(dirname):
209
    """Delete the contents of a directory."""
210 1
    for file in glob.glob('{}/*'.format(dirname)):
211 1
        if os.path.isdir(file):
212 1
            shutil.rmtree(os.path.join(dirname, file))
213
        else:
214
            os.remove(os.path.join(dirname, file))
215