doorstop.core.importer.check()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 18
rs 9.9
c 0
b 0
f 0
cc 3
nop 1
1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Functions to import exiting documents and items."""
4
5
import csv
6
import os
7
import re
8
import warnings
9
from typing import Any
10
11
import openpyxl
12
13
from doorstop import common, settings
14
from doorstop.common import DoorstopError
15
from doorstop.core.builder import _get_tree
16
from doorstop.core.document import Document
17
from doorstop.core.item import Item
18
from doorstop.core.types import UID
19
20
LIST_SEP_RE = re.compile(r"[\s;,]+")  # regex to split list strings into parts
21
22
_documents = []  # cache of unplaced documents
23
24
log = common.logger(__name__)
25
26
27
def import_file(path, document, ext=None, mapping=None, **kwargs):
28
    """Import items from an exported file.
29
30
    :param path: input file location
31
    :param document: document to import items
32
    :param ext: file extension to override input path's extension
33
    :param mapping: dictionary mapping custom to standard attribute names
34
35
    :raise DoorstopError: for unknown file formats
36
37
    :return: document with imported items
38
39
    """
40
    log.info("importing {} into {}...".format(path, document))
41
    ext = ext or os.path.splitext(path)[-1]
42
    func = check(ext)
43
    func(path, document, mapping=mapping, **kwargs)
44
45
46
def create_document(prefix, path, parent=None, tree=None):
47
    """Create a Doorstop document from existing document information.
48
49
    :param prefix: existing document's prefix (for new items)
50
    :param path: new directory path to store this document's items
51
    :param parent: parent document's prefix (if one will exist)
52
    :param tree: explicit tree to add the document
53
54
    :return: imported Document
55
56
    """
57
    if not tree:
58
        tree = _get_tree()
59
60
    # Attempt to create a document with the given parent
61
    log.info("importing document '{}'...".format(prefix))
62
    try:
63
        document = tree.create_document(path, prefix, parent=parent)
64
    except DoorstopError as exc:
65
        if not parent:
66
            raise exc from None  # pylint: disable=raising-bad-type
67
68
        # Create the document despite an unavailable parent
69
        document = Document.new(tree, path, tree.root, prefix, parent=parent)
70
        log.warning(exc)
71
        _documents.append(document)
72
73
    # TODO: attempt to place unplaced documents?
74
75
    log.info("imported: {}".format(document))
76
    return document
77
78
79
def add_item(prefix, uid, attrs=None, document=None, request_next_number=None):
80
    """Create a Doorstop document from existing document information.
81
82
    :param prefix: previously imported document's prefix
83
    :param uid: existing item's UID
84
    :param attrs: dictionary of Doorstop and custom attributes
85
    :param document: explicit document to add the item
86
    :param request_next_number: server method to get a document's next number
87
88
    :return: imported Item
89
90
    """
91
    if document:
92
        # Get an explicit tree
93
        tree = document.tree
94
        assert tree  # tree should be set internally
95
    else:
96
        # Get an implicit tree and document
97
        tree = _get_tree(request_next_number=request_next_number)
98
        document = tree.find_document(prefix)
99
100
    # Add an item using the specified UID
101
    log.info("importing item '{}'...".format(uid))
102
    item = Item.new(tree, document, document.path, document.root, uid, auto=False)
103
    for key, value in (attrs or {}).items():
104
        item.set(key, value)
105
    item.save()
106
107
    log.info("imported: {}".format(item))
108
    return item
109
110
111
def _file_yml(path, document, **_):
112
    """Import items from a YAML export to a document.
113
114
    :param path: input file location
115
    :param document: document to import items
116
117
    """
118
    # Parse the file
119
    log.info("reading items in {}...".format(path))
120
    text = common.read_text(path)
121
    # Load the YAML data
122
    data = common.load_yaml(text, path)
123
    # Add items
124
    for uid, attrs in data.items():
125
        try:
126
            item = document.find_item(uid)
127
        except DoorstopError:
128
            pass  # no matching item
129
        else:
130
            item.delete()
131
        add_item(document.prefix, uid, attrs=attrs, document=document)
132
133
134
def _file_csv(path, document, delimiter=",", mapping=None):
135
    """Import items from a CSV export to a document.
136
137
    :param path: input file location
138
    :param document: document to import items
139
    :param delimiter: CSV field delimiter
140
    :param mapping: dictionary mapping custom to standard attribute names
141
142
    """
143
    rows = []
144
145
    # Parse the file
146
    log.info("reading rows in {}...".format(path))
147
    with open(path, "r", encoding="utf-8") as stream:
148
        reader = csv.reader(stream, delimiter=delimiter)
149
        for _row in reader:
150
            row = []
151
            value: Any
152
            for value in _row:
153
                # convert string booleans
154
                if isinstance(value, str):
155
                    if value.lower() == "true":
156
                        value = True
157
                    elif value.lower() == "false":
158
                        value = False
159
                row.append(value)
160
            rows.append(row)
161
162
    # Extract header and data rows
163
    header = rows[0]
164
    data = rows[1:]
165
166
    # Import items from the rows
167
    _itemize(header, data, document, mapping=mapping)
168
169
170
def _file_tsv(path, document, mapping=None):
171
    """Import items from a TSV export to a document.
172
173
    :param path: input file location
174
    :param document: document to import items
175
    :param mapping: dictionary mapping custom to standard attribute names
176
177
    """
178
    _file_csv(path, document, delimiter="\t", mapping=mapping)
179
180
181
def _file_xlsx(path, document, mapping=None):
182
    """Import items from an XLSX export to a document.
183
184
    :param path: input file location
185
    :param document: document to import items
186
    :param mapping: dictionary mapping custom to standard attribute names
187
188
    """
189
    header = []
190
    data = []
191
192
    # Parse the file
193
    log.debug("reading rows in {}...".format(path))
194
    workbook = openpyxl.load_workbook(path, data_only=True)
195
    worksheet = workbook.active
196
197
    index = 0
198
199
    # Extract header and data rows
200
    for index, row in enumerate(worksheet.iter_rows()):
201
        row2 = []
202
        for cell in row:
203
            if index == 0:
204
                header.append(cell.value)
205
            else:
206
                row2.append(cell.value)
207
        if index:
208
            data.append(row2)
209
210
    # Warn about workbooks that may be sized incorrectly
211
    if index >= 2**20 - 1:
212
        msg = "workbook contains the maximum number of rows"
213
        warnings.warn(msg, Warning)
214
215
    # Import items from the rows
216
    _itemize(header, data, document, mapping=mapping)
217
218
219
def _itemize(header, data, document, mapping=None):
220
    """Conversion function for multiple formats.
221
222
    :param header: list of columns names
223
    :param data: list of lists of row values
224
    :param document: document to import items
225
    :param mapping: dictionary mapping custom to standard attribute names
226
227
    """
228
    log.info("converting rows to items...")
229
    log.debug("header: {}".format(header))
230
    for row in data:
231
        log.debug("row: {}".format(row))
232
233
        # Parse item attributes
234
        attrs = {}
235
        uid = None
236
        for index, value in enumerate(row):
237
            # Key lookup
238
            key = str(header[index]).lower().strip() if header[index] else ""
239
            if not key:
240
                continue
241
242
            # Map key to custom attributes names
243
            for custom, standard in (mapping or {}).items():
244
                if key == custom.lower():
245
                    msg = "mapped: '{}' => '{}'".format(key, standard)
246
                    log.debug(msg)
247
                    key = standard
248
                    break
249
250
            # Convert values for particular keys
251
            if key in ("uid", "id"):  # 'id' for backwards compatibility
252
                uid = value
253
            elif key == "links":
254
                # split links into a list
255
                attrs[key] = _split_list(value)
256
257
            elif key == "references" and (value is not None):
258
                ref_items = value.split("\n")
259
                if ref_items[0] != "":
260
                    ref = []
261
                    for ref_item in ref_items:
262
                        ref_item_components = ref_item.split(",")
263
264
                        ref_type = ref_item_components[0].split(":")[1]
265
                        ref_path = ref_item_components[1].split(":")[1]
266
267
                        ref_dict = {"type": ref_type, "path": ref_path}
268
                        if len(ref_item_components) == 3:
269
                            ref_keyword = ref_item_components[2].split(":", 1)[1]
270
                            ref_dict["keyword"] = ref_keyword
271
272
                        ref.append(ref_dict)
273
274
                    attrs[key] = ref
275
            elif key == "active":
276
                # require explicit disabling
277
                attrs["active"] = value is not False
278
            else:
279
                attrs[key] = value
280
281
        # Get the next UID if the row is a new item
282
        if attrs.get("text") and uid in (None, "", settings.PLACEHOLDER):
283
            uid = UID(
284
                document.prefix, document.sep, document.next_number, document.digits
285
            )
286
287
        # Convert the row to an item
288
        if uid and uid != settings.PLACEHOLDER:
289
            # Delete the old item
290
            try:
291
                item = document.find_item(uid)
292
            except DoorstopError:
293
                log.debug("not yet an item: {}".format(uid))
294
            else:
295
                log.debug("deleting old item: {}".format(uid))
296
                item.delete()
297
298
            # Import the item
299
            try:
300
                item = add_item(document.prefix, uid, attrs=attrs, document=document)
301
            except DoorstopError as exc:
302
                log.warning(exc)
303
304
305
def _split_list(value):
306
    """Split a string list into parts."""
307
    if value:
308
        return [p for p in LIST_SEP_RE.split(value) if p]
309
    else:
310
        return []
311
312
313
# Mapping from file extension to file reader
314
FORMAT_FILE = {
315
    ".yml": _file_yml,
316
    ".csv": _file_csv,
317
    ".tsv": _file_tsv,
318
    ".xlsx": _file_xlsx,
319
}
320
321
322
def check(ext):
323
    """Confirm an extension is supported for import.
324
325
    :raise DoorstopError: for unknown formats
326
327
    :return: file importer if available
328
329
    """
330
    exts = ", ".join(ext for ext in FORMAT_FILE)
331
    msg = "unknown import format: {} (options: {})".format(ext or None, exts)
332
    exc = DoorstopError(msg)
333
    try:
334
        func = FORMAT_FILE[ext]
335
    except KeyError:
336
        raise exc from None
337
    else:
338
        log.debug("found file reader for: {}".format(ext))
339
        return func
340