Completed
Push — develop ( 0f6046...f40dba )
by Jace
15s queued 13s
created

doorstop.common.load_markdown()   A

Complexity

Conditions 3

Size

Total Lines 25
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 25
rs 9.85
c 0
b 0
f 0
cc 3
nop 3
1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Common exceptions, classes, and functions for Doorstop."""
4
5
import argparse
6
import codecs
7
import csv
8
import glob
9
import io
10
import logging
11
import os
12
import re
13
import shutil
14
15
import frontmatter
16
import yaml
17
18
verbosity = 0  # global verbosity setting for controlling string formatting
19
PRINT_VERBOSITY = 0  # minimum verbosity to using `print`
20
STR_VERBOSITY = 3  # minimum verbosity to use verbose `__str__`
21
MAX_VERBOSITY = 4  # maximum verbosity level implemented
22
23
24
def _trace(self, message, *args, **kws):
25
    if self.isEnabledFor(logging.DEBUG - 1):
26
        self._log(logging.DEBUG - 1, message, args, **kws)  # pylint: disable=W0212
27
28
29
logging.addLevelName(logging.DEBUG - 1, "TRACE")
30
logging.Logger.trace = _trace  # type: ignore
31
32
logger = logging.getLogger
33
log = logger(__name__)
34
35
# exception classes ##########################################################
36
37
38
class DoorstopError(Exception):
39
    """Generic Doorstop error."""
40
41
42
class DoorstopFileError(DoorstopError, IOError):
43
    """Raised on IO errors."""
44
45
46
class DoorstopWarning(DoorstopError, Warning):
47
    """Generic Doorstop warning."""
48
49
50
class DoorstopInfo(DoorstopWarning, Warning):
51
    """Generic Doorstop info."""
52
53
54
# logging classes ############################################################
55
56
57
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
58
    """Command-line help text formatter with wider help text."""
59
60
    def __init__(self, *args, **kwargs):
61
        kwargs["max_help_position"] = 40
62
        super().__init__(*args, **kwargs)
63
64
65
class WarningFormatter(logging.Formatter):
66
    """Logging formatter that displays verbose formatting for WARNING+."""
67
68
    def __init__(self, default_format, verbose_format, *args, **kwargs):
69
        super().__init__(*args, **kwargs)
70
        self.default_format = default_format
71
        self.verbose_format = verbose_format
72
73
    def format(self, record):
74
        """Python 3 hack to change the formatting style dynamically."""
75
        if record.levelno > logging.INFO:
76
            self._style._fmt = self.verbose_format  # pylint: disable=W0212
77
        else:
78
            self._style._fmt = self.default_format  # pylint: disable=W0212
79
        return super().format(record)
80
81
82
# disk helper functions ######################################################
83
84
85
def create_dirname(path):
86
    """Ensure a parent directory exists for a path."""
87
    dirpath = os.path.dirname(path)
88
    if dirpath and not os.path.isdir(dirpath):
89
        log.info("creating directory {}...".format(dirpath))
90
        os.makedirs(dirpath)
91
92
93
def read_lines(path, encoding="utf-8"):
94
    """Read lines of text from a file.
95
96
    :param path: file to write lines
97
    :param encoding: output file encoding
98
99
    :return: path of new file
100
101
    """
102
    log.trace("reading lines from '{}'...".format(path))  # type: ignore
103
    with open(path, "r", encoding=encoding) as stream:
104
        for line in stream:
105
            yield line
106
107
108
def read_text(path):
109
    """Read text from a file.
110
111
    :param path: file path to read from
112
    :param encoding: input file encoding
113
114
    :return: string
115
116
    """
117
    log.trace("reading text from '{}'...".format(path))  # type: ignore
118
    try:
119
        with codecs.open(path, "r", encoding="utf-8") as f:
120
            return f.read()
121
    except Exception as e:
122
        msg = "reading '{}' failed: {}".format(path, e)
123
        raise DoorstopError(msg)
124
125
126
def load_yaml(text, path, loader=yaml.SafeLoader):
127
    """Parse a dictionary from YAML text.
128
129
    :param text: string containing dumped YAML data
130
    :param path: file path for error messages
131
132
    :return: dictionary
133
134
    """
135
    # Load the YAML data
136
    try:
137
        data = yaml.load(text, Loader=loader) or {}
138
    except yaml.error.YAMLError as exc:
139
        msg = "invalid contents: {}:\n{}".format(path, exc)
140
        raise DoorstopError(msg) from None
141
    # Ensure data is a dictionary
142
    if not isinstance(data, dict):
143
        msg = "invalid contents: {}".format(path)
144
        raise DoorstopError(msg)
145
    # Return the parsed data
146
    return data
147
148
149
def load_markdown(text, path, textattributekeys):
150
    """Parse a dictionary from Markdown file with YAML frontmatter.
151
152
    :param text: string containing markdown data with yaml frontmatter
153
    :param path: file path for error messages
154
155
    :return: dictionary
156
157
    """
158
    # Load YAML-frontmatter data from text
159
    try:
160
        data, content = frontmatter.parse(text, handler=frontmatter.YAMLHandler())
161
    except yaml.error.YAMLError as exc:
162
        msg = "invalid yaml contents: {}:\n{}".format(path, exc)
163
        raise DoorstopError(msg) from None
164
    # Ensure data is a dictionary
165
    if not isinstance(data, dict):
166
        msg = "invalid contents: {}".format(path)
167
        raise DoorstopError(msg)
168
169
    # parse content and update data dictionariy accordingly
170
    update_data_from_markdown_content(data, content, textattributekeys)
171
172
    # Return the parsed data
173
    return data
174
175
176
def write_lines(lines, path, end="\n", encoding="utf-8", *, executable=False):
177
    """Write lines of text to a file.
178
179
    :param lines: iterator of strings
180
    :param path: file to write lines
181
    :param end: string to end lines
182
    :param encoding: output file encoding
183
184
    :return: path of new file
185
186
    """
187
    log.trace("writing lines to '{}'...".format(path))  # type: ignore
188
    with open(path, "wb") as stream:
189
        for line in lines:
190
            data = (line + end).encode(encoding)
191
            stream.write(data)
192
    if executable and os.path.isfile(path):
193
        os.chmod(path, 0o775)
194
    return path
195
196
197
def write_text(text, path, end="\n"):
198
    """Write text to a file.
199
200
    :param text: string
201
    :param path: file to write text
202
    :param end: string to end lines
203
    :param encoding: output file encoding
204
205
    :return: path of new file
206
207
    """
208
    if text:
209
        log.trace("writing text to '{}'...".format(path))  # type: ignore
210
    with open(path, "w", encoding="utf-8", newline=end) as f:
211
        f.write(text)
212
    return path
213
214
215
def write_csv(table, path, delimiter=",", newline="", encoding="utf-8"):
216
    """Write table to a file.
217
218
    :param table: iterator of rows
219
    :param path: file to write lines
220
    :param delimiter: string to end cells
221
    :param newline: string to end lines
222
    :param encoding: output file encoding
223
224
    :return: path of new file
225
226
    """
227
    log.trace("writing table to '{}'...".format(path))  # type: ignore
228
    with open(path, "w", newline=newline, encoding=encoding) as stream:
229
        writer = csv.writer(stream, delimiter=delimiter)
230
        for row in table:
231
            writer.writerow(row)
232
    return path
233
234
235
def touch(path):
236
    """Ensure a file exists."""
237
    if not os.path.exists(path):
238
        log.trace("creating empty '{}'...".format(path))  # type: ignore
239
        write_text("", path)
240
241
242
def copy_dir_contents(src, dst):
243
    """Copy the contents of a directory."""
244
    for fpath in glob.glob("{}/*".format(src)):
245
        dest_path = os.path.join(dst, os.path.split(fpath)[-1])
246
        if os.path.exists(dest_path):
247
            if os.path.basename(fpath) == "doorstop":
248
                msg = "Skipping '{}' as this directory name is required by doorstop".format(
249
                    fpath
250
                )
251
            else:
252
                msg = "Skipping '{}' as a file or directory with this name already exists".format(
253
                    fpath
254
                )
255
            log.warning(msg)
256
        else:
257
            if os.path.isdir(fpath):
258
                shutil.copytree(fpath, dest_path)
259
            else:
260
                shutil.copyfile(fpath, dest_path)
261
262
263
def delete(path):
264
    """Delete a file or directory with error handling."""
265
    if os.path.isdir(path):
266
        try:
267
            log.trace("deleting '{}'...".format(path))  # type: ignore
268
            shutil.rmtree(path)
269
        except IOError:
270
            # bug: http://code.activestate.com/lists/python-list/159050
271
            msg = "unable to delete: {}".format(path)
272
            log.warning(msg)
273
    elif os.path.isfile(path):
274
        log.trace("deleting '{}'...".format(path))  # type: ignore
275
        os.remove(path)
276
277
278
def delete_contents(dirname):
279
    """Delete the contents of a directory."""
280
    for file in glob.glob("{}/*".format(dirname)):
281
        if os.path.isdir(file):
282
            shutil.rmtree(os.path.join(dirname, file))
283
        else:
284
            try:
285
                os.remove(os.path.join(dirname, file))
286
            except FileExistsError:
287
                log.warning(
288
                    "Two assets folders have files or directories " "with the same name"
289
                )
290
                raise
291
292
293
def update_data_from_markdown_content(data, content, textattributekeys):
294
    """Update data dictionary based on text content and attribute keys to look for within the content."""
295
    h1 = re.compile(r"^#{1}\s+(.*)")
296
    # for line based iteration
297
    s = io.StringIO(content)
298
    # final text
299
    header = None
300
    text = ""
301
302
    if "header" in textattributekeys:
303
        # search for first content line and check
304
        # if it is a h1 header
305
        for l in s:
306
            # skip empty lines
307
            if len(l.strip()) == 0:
308
                continue
309
            # check if first found line is a header
310
            m = h1.match(l.strip())
311
            if m:
312
                # header found
313
                header = m.group(1)
314
            else:
315
                # no header found, add to normal text
316
                text += l
317
            break
318
319
        # if header was found, skip empty lines before main text
320
        if header:
321
            for l in s:
322
                if len(l.strip()) != 0:
323
                    text += l
324
                    break
325
326
    # remaining content is normal text
327
    for l in s:
328
        text += l
329
330
    if "header" in textattributekeys and header:
331
        data["header"] = header
332
333
    if "text" in textattributekeys:
334
        data["text"] = text
335
336
337
def dump_markdown(data, textattr):
338
    content = ""
339
    if "header" in textattr and textattr["header"].strip() != "":
340
        content += "# {}\n".format(textattr["header"].strip())
341
        content += "\n"
342
343
    content += textattr["text"]
344
345
    text = frontmatter.dumps(frontmatter.Post(content, **data))
346
    return text
347