doorstop.core.importer._itemize()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 67
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 37
CRAP Score 17

Importance

Changes 0
Metric Value
eloc 38
dl 0
loc 67
ccs 37
cts 37
cp 1
rs 1.8
c 0
b 0
f 0
cc 17
nop 4
crap 17

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like doorstop.core.importer._itemize() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3 1
"""Functions to import exiting documents and items."""
4 1
5 1
import csv
6 1
import os
7
import re
8 1
import warnings
9
from typing import Any
10 1
11 1
import openpyxl
12 1
13 1
from doorstop import common, settings
14 1
from doorstop.common import DoorstopError
15 1
from doorstop.core.builder import _get_tree
16 1
from doorstop.core.document import Document
17
from doorstop.core.item import Item
18
from doorstop.core.types import UID
19 1
20
LIST_SEP_RE = re.compile(r"[\s;,]+")  # regex to split list strings into parts
21 1
22
_documents = []  # cache of unplaced documents
23 1
24
log = common.logger(__name__)
25
26 1
27
def import_file(path, document, ext=None, mapping=None, **kwargs):
28
    """Import items from an exported file.
29
30
    :param path: input file location
31
    :param document: document to import items
32
    :param ext: file extension to override input path's extension
33
    :param mapping: dictionary mapping custom to standard attribute names
34
35
    :raise DoorstopError: for unknown file formats
36
37
    :return: document with imported items
38
39 1
    """
40 1
    log.info("importing {} into {}...".format(path, document))
41 1
    ext = ext or os.path.splitext(path)[-1]
42 1
    func = check(ext)
43
    func(path, document, mapping=mapping, **kwargs)
44
45 1
46
def create_document(prefix, path, parent=None, tree=None):
47
    """Create a Doorstop document from existing document information.
48
49
    :param prefix: existing document's prefix (for new items)
50
    :param path: new directory path to store this document's items
51
    :param parent: parent document's prefix (if one will exist)
52
    :param tree: explicit tree to add the document
53
54
    :return: imported Document
55
56 1
    """
57 1
    if not tree:
58
        tree = _get_tree()
59
60 1
    # Attempt to create a document with the given parent
61 1
    log.info("importing document '{}'...".format(prefix))
62 1
    try:
63 1
        document = tree.create_document(path, prefix, parent=parent)
64 1
    except DoorstopError as exc:
65 1
        if not parent:
66
            raise exc from None  # pylint: disable=raising-bad-type
67
68 1
        # Create the document despite an unavailable parent
69
        document = Document.new(tree, path, tree.root, prefix, parent=parent)
70
        log.warning(exc)
71 1
        _documents.append(document)
72 1
73
    # TODO: attempt to place unplaced documents?
74
75
    log.info("imported: {}".format(document))
76 1
    return document
77 1
78
79
def add_item(prefix, uid, attrs=None, document=None, request_next_number=None):
80 1
    """Create a Doorstop document from existing document information.
81
82
    :param prefix: previously imported document's prefix
83
    :param uid: existing item's UID
84
    :param attrs: dictionary of Doorstop and custom attributes
85
    :param document: explicit document to add the item
86
    :param request_next_number: server method to get a document's next number
87
88
    :return: imported Item
89
90
    """
91
    if document:
92 1
        # Get an explicit tree
93
        tree = document.tree
94 1
        assert tree  # tree should be set internally
95 1
    else:
96
        # Get an implicit tree and document
97
        tree = _get_tree(request_next_number=request_next_number)
98 1
        document = tree.find_document(prefix)
99 1
100
    # Add an item using the specified UID
101
    log.info("importing item '{}'...".format(uid))
102 1
    item = Item.new(tree, document, document.path, document.root, uid, auto=False)
103 1
    for key, value in (attrs or {}).items():
104
        item.set(key, value)
105
    item.save()
106 1
107 1
    log.info("imported: {}".format(item))
108 1
    return item
109
110 1
111 1
def _file_yml(path, document, **_):
112
    """Import items from a YAML export to a document.
113
114 1
    :param path: input file location
115
    :param document: document to import items
116
117
    """
118
    # Parse the file
119
    log.info("reading items in {}...".format(path))
120
    text = common.read_text(path)
121
    # Load the YAML data
122 1
    data = common.load_yaml(text, path)
123 1
    # Add items
124
    for uid, attrs in data.items():
125 1
        try:
126
            item = document.find_item(uid)
127 1
        except DoorstopError:
128 1
            pass  # no matching item
129 1
        else:
130 1
            item.delete()
131 1
        add_item(document.prefix, uid, attrs=attrs, document=document)
132
133 1
134 1
def _file_csv(path, document, delimiter=',', mapping=None):
135
    """Import items from a CSV export to a document.
136
137 1
    :param path: input file location
138
    :param document: document to import items
139
    :param delimiter: CSV field delimiter
140
    :param mapping: dictionary mapping custom to standard attribute names
141
142
    """
143
    rows = []
144
145
    # Parse the file
146 1
    log.info("reading rows in {}...".format(path))
147
    with open(path, 'r', encoding='utf-8') as stream:
148
        reader = csv.reader(stream, delimiter=delimiter)
149 1
        for _row in reader:
150 1
            row = []
151 1
            value: Any
152 1
            for value in _row:
153 1
                # convert string booleans
154 1
                if isinstance(value, str):
155
                    if value.lower() == 'true':
156 1
                        value = True
157 1
                    elif value.lower() == 'false':
158 1
                        value = False
159 1
                row.append(value)
160 1
            rows.append(row)
161 1
162 1
    # Extract header and data rows
163
    header = rows[0]
164
    data = rows[1:]
165 1
166 1
    # Import items from the rows
167
    _itemize(header, data, document, mapping=mapping)
168
169 1
170
def _file_tsv(path, document, mapping=None):
171
    """Import items from a TSV export to a document.
172 1
173
    :param path: input file location
174
    :param document: document to import items
175
    :param mapping: dictionary mapping custom to standard attribute names
176
177
    """
178
    _file_csv(path, document, delimiter='\t', mapping=mapping)
179
180 1
181
def _file_xlsx(path, document, mapping=None):
182
    """Import items from an XLSX export to a document.
183 1
184
    :param path: input file location
185
    :param document: document to import items
186
    :param mapping: dictionary mapping custom to standard attribute names
187
188
    """
189
    header = []
190
    data = []
191 1
192 1
    # Parse the file
193
    log.debug("reading rows in {}...".format(path))
194
    workbook = openpyxl.load_workbook(path, data_only=True)
195 1
    worksheet = workbook.active
196 1
197 1
    index = 0
198
199
    # Extract header and data rows
200 1
    for index, row in enumerate(worksheet.iter_rows()):
201 1
        row2 = []
202 1
        for cell in row:
203 1
            if index == 0:
204 1
                header.append(cell.value)
205
            else:
206 1
                row2.append(cell.value)
207 1
        if index:
208 1
            data.append(row2)
209
210
    # Warn about workbooks that may be sized incorrectly
211
    if index >= 2 ** 20 - 1:
212
        msg = "workbook contains the maximum number of rows"
213
        warnings.warn(msg, Warning)
214
215
    # Import items from the rows
216 1
    _itemize(header, data, document, mapping=mapping)
217
218
219 1
def _itemize(header, data, document, mapping=None):
220
    """Conversion function for multiple formats.
221
222
    :param header: list of columns names
223
    :param data: list of lists of row values
224
    :param document: document to import items
225
    :param mapping: dictionary mapping custom to standard attribute names
226
227
    """
228 1
    log.info("converting rows to items...")
229 1
    log.debug("header: {}".format(header))
230 1
    for row in data:
231 1
        log.debug("row: {}".format(row))
232
233
        # Parse item attributes
234 1
        attrs = {}
235 1
        uid = None
236 1
        for index, value in enumerate(row):
237
238
            # Key lookup
239 1
            key = str(header[index]).lower().strip() if header[index] else ''
240 1
            if not key:
241 1
                continue
242
243
            # Map key to custom attributes names
244 1
            for custom, standard in (mapping or {}).items():
245 1
                if key == custom.lower():
246 1
                    msg = "mapped: '{}' => '{}'".format(key, standard)
247 1
                    log.debug(msg)
248 1
                    key = standard
249 1
                    break
250
251
            # Convert values for particular keys
252 1
            if key in ('uid', 'id'):  # 'id' for backwards compatibility
253 1
                uid = value
254 1
            elif key == 'links':
255
                # split links into a list
256 1
                attrs[key] = _split_list(value)
257 1
            elif key == 'active':
258
                # require explicit disabling
259 1
                attrs['active'] = value is not False
260
            else:
261 1
                attrs[key] = value
262
263
        # Get the next UID if the row is a new item
264 1
        if attrs.get('text') and uid in (None, '', settings.PLACEHOLDER):
265 1
            uid = UID(
266
                document.prefix, document.sep, document.next_number, document.digits
267
            )
268
269 1
        # Convert the row to an item
270
        if uid and uid != settings.PLACEHOLDER:
271
272 1
            # Delete the old item
273 1
            try:
274 1
                item = document.find_item(uid)
275 1
            except DoorstopError:
276
                log.debug("not yet an item: {}".format(uid))
277 1
            else:
278 1
                log.debug("deleting old item: {}".format(uid))
279
                item.delete()
280
281 1
            # Import the item
282 1
            try:
283
                item = add_item(document.prefix, uid, attrs=attrs, document=document)
284 1
            except DoorstopError as exc:
285 1
                log.warning(exc)
286
287
288 1
def _split_list(value):
289
    """Split a string list into parts."""
290 1
    if value:
291 1
        return [p for p in LIST_SEP_RE.split(value) if p]
292
    else:
293 1
        return []
294
295
296
# Mapping from file extension to file reader
297 1
FORMAT_FILE = {
298
    '.yml': _file_yml,
299
    '.csv': _file_csv,
300
    '.tsv': _file_tsv,
301
    '.xlsx': _file_xlsx,
302
}
303 1
304
305
def check(ext):
306
    """Confirm an extension is supported for import.
307
308
    :raise DoorstopError: for unknown formats
309
310
    :return: file importer if available
311 1
312 1
    """
313 1
    exts = ', '.join(ext for ext in FORMAT_FILE)
314 1
    msg = "unknown import format: {} (options: {})".format(ext or None, exts)
315 1
    exc = DoorstopError(msg)
316 1
    try:
317 1
        func = FORMAT_FILE[ext]
318
    except KeyError:
319 1
        raise exc from None
320 1
    else:
321
        log.debug("found file reader for: {}".format(ext))
322
        return func
323