doorstop.cli.commands._iter_items()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 51
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 14

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 51
ccs 11
cts 11
cp 1
rs 3.6
c 0
b 0
f 0
cc 14
nop 3
crap 14

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like doorstop.cli.commands._iter_items() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3 1
"""Command functions."""
4 1
5
import os
6 1
import time
7 1
from typing import Set
8 1
9 1
from doorstop import common, server
10 1
from doorstop.cli import utilities
11
from doorstop.core import editor, exporter, importer, publisher
12 1
from doorstop.core.builder import build
13
14
log = common.logger(__name__)
15 1
16
17 1
class CycleTracker:
18 1
    """A cycle tracker to detect cyclic references between items.
19 1
20
    The cycle tracker uses a standard algorithm to detect cycles in a directed
21 1
    graph (not necessarily connected) using a depth first search with a bit of
22 1
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
23
    the items.  The edges are the links between items.
24
25 1
    """
26
27
    def __init__(self):
28
        """Initialize a cycle tracker."""
29
        self.discovered: Set[str] = set()
30
        self.finished: Set[str] = set()
31
32
    def _dfs_visit(self, uid, tree):
33
        """Do a depth first search visit of the specified item.
34 1
35
        :param uid: the UID of the item to visit
36
        :param tree: the document hierarchy tree
37 1
38
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
39
40 1
        """
41 1
        self.discovered.add(uid)
42
        item = tree.find_item(uid)
43 1
44 1
        for pid in item.links:
45
            # Detect cycles via a back edge
46 1
            if pid in self.discovered:
47 1
                msg = "detected a cycle with a back edge from {} to {}".format(pid, uid)
48
                yield common.DoorstopWarning(msg)
49 1
50
            # Recurse, if this a fresh item
51
            if pid not in self.discovered and pid not in self.finished:
52 1
                yield from self._dfs_visit(pid, tree)
53
54
        self.discovered.remove(uid)
55
        self.finished.add(uid)
56
57
    def __call__(self, item, document, tree):
58
        """Get cycles which include the specified item.
59
60
        :param item: the UID of the item to get the cycles for
61 1
        :param document: unused
62
        :param tree: the document hierarchy tree
63
64 1
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
65
66
        """
67 1
        if item not in self.discovered and item not in self.finished:
68
            yield from self._dfs_visit(item, tree)
69
70 1
71 1
def get(name):
72
    """Get a command function by name."""
73 1
    if name:
74
        log.debug("running command '{}'...".format(name))
75 1
        return globals()['run_' + name]
76
    else:
77
        log.debug("launching main command...")
78 1
        return run
79
80
81
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
82
    """Process arguments and run the `doorstop` subcommand.
83
84
    :param args: Namespace of CLI arguments
85
    :param cwd: current working directory
86
    :param error: function to call for CLI errors
87 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
88
89
    """
90 1
    with utilities.capture(catch=catch) as success:
91 1
92
        # get the tree
93
        tree = _get_tree(args, cwd, load=True)
94 1
95 1
        # validate it
96
        utilities.show("validating items...", flush=True)
97 1
        cycle_tracker = CycleTracker()
98 1
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
99
100 1
    if not success:
101
        return False
102 1
103
    if len(tree) > 1 and valid:
104
        utilities.show('\n' + tree.draw() + '\n')
105 1
106
    return valid
107
108
109
def run_create(args, cwd, _, catch=True):
110
    """Process arguments and run the `doorstop create` subcommand.
111
112
    :param args: Namespace of CLI arguments
113
    :param cwd: current working directory
114 1
    :param error: function to call for CLI errors
115
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
116
117 1
    """
118 1
    with utilities.capture(catch=catch) as success:
119 1
120
        # get the tree
121
        tree = _get_tree(args, cwd)
122 1
123 1
        # create a new document
124 1
        document = tree.create_document(
125
            args.path, args.prefix, parent=args.parent, digits=args.digits
126
        )
127 1
128 1
    if not success:
129
        return False
130 1
131
    utilities.show(
132
        "created document: {} ({})".format(document.prefix, document.relpath)
133 1
    )
134
    return True
135
136
137
def run_delete(args, cwd, _, catch=True):
138
    """Process arguments and run the `doorstop delete` subcommand.
139
140
    :param args: Namespace of CLI arguments
141
    :param cwd: current working directory
142 1
    :param error: function to call for CLI errors
143
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
144
145 1
    """
146 1
    with utilities.capture(catch=catch) as success:
147
148
        # get the document
149 1
        tree = _get_tree(args, cwd)
150
        document = tree.find_document(args.prefix)
151 1
152 1
        # delete it
153
        prefix, relpath = document.prefix, document.relpath
154 1
        document.delete()
155
156 1
    if not success:
157
        return False
158
159 1
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
160
161
    return True
162
163
164
def run_add(args, cwd, _, catch=True):
165
    """Process arguments and run the `doorstop add` subcommand.
166
167
    :param args: Namespace of CLI arguments
168 1
    :param cwd: current working directory
169 1
    :param error: function to call for CLI errors
170
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
171 1
172
    """
173
    with utilities.capture(catch=catch) as success:
174 1
175 1
        # get the document
176 1
        request_next_number = _request_next_number(args)
177 1
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
178 1
        document = tree.find_document(args.prefix)
179 1
180 1
        # add items to it
181
        for _ in range(args.count):
182 1
            item = document.add_item(level=args.level, defaults=args.defaults)
183 1
            utilities.show("added item: {} ({})".format(item.uid, item.relpath))
184
185
        # Edit item if requested
186 1
        if args.edit:
187 1
            item.edit(tool=args.tool)
0 ignored issues
show
introduced by
The variable item does not seem to be defined in case the for loop on line 181 is not entered. Are you sure this can never be the case?
Loading history...
188
189 1
    if not success:
190
        return False
191 1
192
    return True
193
194 1
195 1
def run_remove(args, cwd, _, catch=True):
196
    """Process arguments and run the `doorstop remove` subcommand.
197 1
198
    :param args: Namespace of CLI arguments
199
    :param cwd: current working directory
200 1
    :param error: function to call for CLI errors
201
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
202
203
    """
204
    with utilities.capture(catch=catch) as success:
205
206
        # get the item
207
        tree = _get_tree(args, cwd)
208
        item = tree.find_item(args.uid)
209 1
210
        # delete it
211 1
        item.delete()
212
213
    if not success:
214 1
        return False
215 1
216
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
217 1
218 1
    return True
219
220 1
221
def run_edit(args, cwd, error, catch=True):
222
    """Process arguments and run the `doorstop edit` subcommand.
223 1
224 1
    :param args: Namespace of CLI arguments
225 1
    :param cwd: current working directory
226 1
    :param error: function to call for CLI errors
227 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
228
229
    """
230 1
    item = document = None
231 1
    ext = utilities.get_ext(args, error, '.yml', '.yml', whole_tree=False)
232 1
233 1
    with utilities.capture(catch=catch) as success:
234 1
235 1
        # get the item or document
236 1
        request_next_number = _request_next_number(args)
237
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
238 1
        if not args.document:
239
            try:
240
                item = tree.find_item(args.label)
241
            except common.DoorstopError as exc:
242 1
                if args.item:
243 1
                    raise exc from None  # pylint: disable=raising-bad-type
244 1
        if not item:
245 1
            document = tree.find_document(args.label)
246
247 1
        # edit it
248 1
        if item:
249 1
            item.edit(tool=args.tool, edit_all=args.all)
250 1
        else:
251
            _export_import(args, cwd, error, document, ext)
252 1
253 1
    if not success:
254
        return False
255 1
256
    if item:
257
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
258 1
259
    return True
260
261
262
def run_reorder(args, cwd, error, catch=True, _tree=None):
263
    """Process arguments and run the `doorstop reorder` subcommand.
264
265
    :param args: Namespace of CLI arguments
266
    :param cwd: current working directory
267 1
    :param error: function to call for CLI errors
268
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
269
270 1
    """
271
    reordered = False
272
273 1
    with utilities.capture(catch=catch) as success:
274
275 1
        # get the document
276 1
        tree = _tree or _get_tree(args, cwd)
277
        document = tree.find_document(args.prefix)
278 1
279
    if not success:
280
        return False
281
282 1
    with utilities.capture(catch=catch) as success:
283
284 1
        # automatically order
285
        if args.auto:
286
            msg = "reordering document {}...".format(document)
287 1
            utilities.show(msg, flush=True)
288
            document.reorder(manual=False)
289
            reordered = True
290
291
        # or, reorder from a previously updated index
292
        elif document.index:
293
            relpath = os.path.relpath(document.index, cwd)
294
            if utilities.ask("reorder from '{}'?".format(relpath)):
295
                msg = "reordering document {}...".format(document)
296 1
                utilities.show(msg, flush=True)
297
                document.reorder(automatic=not args.manual)
298
                reordered = True
299 1
            else:
300
                del document.index
301
302 1
        # or, create a new index to update
303
        else:
304 1
            document.index = True  # create index
305 1
            relpath = os.path.relpath(document.index, cwd)
306
            editor.edit(relpath, tool=args.tool)
307 1
            get('reorder')(args, cwd, error, catch=False, _tree=tree)
308
309
    if not success:
310
        msg = "after fixing the error: doorstop reorder {}".format(document)
311 1
        utilities.show(msg)
312
        return False
313 1
314
    if reordered:
315
        utilities.show("reordered document: {}".format(document))
316 1
317
    return True
318
319
320
def run_link(args, cwd, _, catch=True):
321
    """Process arguments and run the `doorstop link` subcommand.
322
323
    :param args: Namespace of CLI arguments
324
    :param cwd: current working directory
325 1
    :param error: function to call for CLI errors
326
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
327 1
328 1
    """
329 1
    with utilities.capture(catch=catch) as success:
330 1
331
        # get the tree
332 1
        tree = _get_tree(args, cwd)
333 1
334
        # link items
335 1
        child, parent = tree.link_items(args.child, args.parent)
336
337
    if not success:
338 1
        return False
339
340
    msg = "linked items: {} ({}) -> {} ({})".format(
341
        child.uid, child.relpath, parent.uid, parent.relpath
342
    )
343
    utilities.show(msg)
344
345
    return True
346
347 1
348
def run_unlink(args, cwd, _, catch=True):
349 1
    """Process arguments and run the `doorstop unlink` subcommand.
350 1
351 1
    :param args: Namespace of CLI arguments
352
    :param cwd: current working directory
353 1
    :param error: function to call for CLI errors
354 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
355
356 1
    """
357
    with utilities.capture(catch=catch) as success:
358
359 1
        # get the tree
360
        tree = _get_tree(args, cwd)
361
362
        # unlink items
363
        child, parent = tree.unlink_items(args.child, args.parent)
364
365
    if not success:
366
        return False
367
368 1
    msg = "unlinked items: {} ({}) -> {} ({})".format(
369 1
        child.uid, child.relpath, parent.uid, parent.relpath
370 1
    )
371 1
    utilities.show(msg)
372 1
373 1
    return True
374 1
375 1
376 1
def run_clear(args, cwd, error, catch=True):
377 1
    """Process arguments and run the `doorstop clear` subcommand.
378 1
379 1
    :param args: Namespace of CLI arguments
380 1
    :param cwd: current working directory
381
    :param error: function to call for CLI errors
382 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
383
384 1
    """
385
    with utilities.capture(catch=catch) as success:
386
        tree = _get_tree(args, cwd)
387 1
388 1
        if args.parents:
389
            # Check that the parent item UIDs exist
390 1
            for pid in args.parents:
391
                tree.find_item(pid)
392
393 1
            pids = " to " + ", ".join(args.parents)
394
        else:
395 1
            pids = ""
396 1
397
        for item in _iter_items(args, tree, error):
398 1
            msg = "clearing item {}'s suspect links{}...".format(item.uid, pids)
399 1
            utilities.show(msg)
400 1
            item.clear(parents=args.parents)
401
402 1
    if not success:
403 1
        return False
404 1
405 1
    return True
406
407 1
408 1
def run_review(args, cwd, error, catch=True):
409
    """Process arguments and run the `doorstop review` subcommand.
410 1
411 1
    :param args: Namespace of CLI arguments
412
    :param cwd: current working directory
413
    :param error: function to call for CLI errors
414 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
415 1
416
    """
417 1
    with utilities.capture(catch=catch) as success:
418
        tree = _get_tree(args, cwd)
419
420 1
        for item in _iter_items(args, tree, error):
421
            utilities.show("marking item {} as reviewed...".format(item.uid))
422
            item.review()
423
424
    if not success:
425
        return False
426
427
    return True
428
429
430
def run_import(args, cwd, error, catch=True, _tree=None):
431 1
    """Process arguments and run the `doorstop import` subcommand.
432 1
433
    :param args: Namespace of CLI arguments
434
    :param cwd: current working directory
435 1
    :param error: function to call for CLI errors
436
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
437 1
438 1
    """
439 1
    document = item = None
440 1
    attrs = utilities.literal_eval(args.attrs, error)
441
    mapping = utilities.literal_eval(args.map, error)
442 1
    if args.path:
443 1
        if not args.prefix:
444
            error("when [path] specified, [prefix] is also required")
445
        elif args.document:
446 1
            error("'--document' cannot be used with [path] [prefix]")
447 1
        elif args.item:
448 1
            error("'--item' cannot be used with [path] [prefix]")
449 1
        ext = utilities.get_ext(args, error, None, None)
450 1
    elif not (args.document or args.item):
451
        error("specify [path], '--document', or '--item' to import")
452 1
453
    with utilities.capture(catch=catch) as success:
454 1
455 1
        if args.path:
456 1
457 1
            # get the document
458
            request_next_number = _request_next_number(args)
459
            tree = _tree or _get_tree(
460
                args, cwd, request_next_number=request_next_number
461 1
            )
462 1
            document = tree.find_document(args.prefix)
463 1
464 1
            # import items into it
465
            msg = "importing '{}' into document {}...".format(args.path, document)
466 1
            utilities.show(msg, flush=True)
467
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
introduced by
The variable ext does not seem to be defined for all execution paths.
Loading history...
468
469 1
        elif args.document:
470
            prefix, path = args.document
471
            document = importer.create_document(prefix, path, parent=args.parent)
472
        elif args.item:
473
            prefix, uid = args.item
474
            request_next_number = _request_next_number(args)
475
            item = importer.add_item(
476
                prefix, uid, attrs=attrs, request_next_number=request_next_number
477
            )
478 1
    if not success:
479 1
        return False
480
481
    if document:
482 1
        utilities.show(
483
            "imported document: {} ({})".format(document.prefix, document.relpath)
484 1
        )
485 1
    else:
486 1
        assert item
487 1
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
488
489 1
    return True
490 1
491
492
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
493 1
    """Process arguments and run the `doorstop export` subcommand.
494 1
495 1
    :param args: Namespace of CLI arguments
496
    :param cwd: current working directory
497
    :param error: function to call for CLI errors
498 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
499 1
500 1
    :param auto: include placeholders for new items on import
501 1
502 1
    """
503 1
    whole_tree = args.prefix == 'all'
504
    ext = utilities.get_ext(args, error, '.yml', '.csv', whole_tree=whole_tree)
505
506 1
    # Get the tree or document
507
    with utilities.capture(catch=catch) as success:
508 1
509 1
        exporter.check(ext)
510
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
511 1
        if not whole_tree:
512 1
            document = tree.find_document(args.prefix)
513
514
    if not success:
515
        return False
516 1
517 1
    # Write to output file(s)
518 1
    if args.path:
519 1
        if whole_tree:
520
            msg = "exporting tree to '{}'...".format(args.path)
521 1
            utilities.show(msg, flush=True)
522
            path = exporter.export(tree, args.path, ext, auto=auto)
523
        else:
524 1
            msg = "exporting document {} to '{}'...".format(document, args.path)
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 511 is False. Are you sure this can never be the case?
Loading history...
525
            utilities.show(msg, flush=True)
526 1
            path = exporter.export(document, args.path, ext, auto=auto)
527 1
        if path:
528 1
            utilities.show("exported: {}".format(path))
529
530 1
    # Or, display to standard output
531 1
    else:
532
        if whole_tree:
533
            error("only single documents can be displayed")
534 1
        for line in exporter.export_lines(document, ext):
535
            utilities.show(line)
536
537
    return True
538
539
540
def run_publish(args, cwd, error, catch=True):
541
    """Process arguments and run the `doorstop publish` subcommand.
542
543
    :param args: Namespace of CLI arguments
544
    :param cwd: current working directory
545 1
    :param error: function to call for CLI errors
546 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
547
548
    """
549 1
    whole_tree = args.prefix == 'all'
550 1
    ext = utilities.get_ext(args, error, '.txt', '.html', whole_tree)
551 1
552
    # Get the tree or document
553 1
    with utilities.capture(catch=catch) as success:
554
555
        publisher.check(ext)
556 1
        tree = _get_tree(args, cwd, load=whole_tree)
557
        if not whole_tree:
558
            document = tree.find_document(args.prefix)
559
560
    if not success:
561
        return False
562
563
    # Set publishing arguments
564
    kwargs = {}
565
    if args.width:
566
        kwargs['width'] = args.width
567
568
    # Write to output file(s)
569
    if args.path:
570
        path = os.path.abspath(os.path.join(cwd, args.path))
571
        if whole_tree:
572
            msg = "publishing tree to '{}'...".format(path)
573
            utilities.show(msg, flush=True)
574
            published_path = publisher.publish(
575
                tree, path, ext, template=args.template, **kwargs
576 1
            )
577 1
        else:
578 1
            msg = "publishing document {} to '{}'...".format(document, path)
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 557 is False. Are you sure this can never be the case?
Loading history...
579 1
            utilities.show(msg, flush=True)
580 1
            published_path = publisher.publish(
581
                document, path, ext, template=args.template, **kwargs
582
            )
583 1
        if published_path:
584 1
            utilities.show("published: {}".format(published_path))
585 1
586
    # Or, display to standard output
587
    else:
588 1
        if whole_tree:
589 1
            error("only single documents can be displayed")
590 1
        for line in publisher.publish_lines(document, ext, **kwargs):
591 1
            utilities.show(line)
592 1
593 1
    return True
594 1
595 1
596 1
def _request_next_number(args):
597
    """Get the server's "next number" method if a server exists."""
598
    if args.force:
599 1
        log.warning("creating items without the server...")
600 1
        return None
601 1
    else:
602 1
        server.check()
603 1
        return server.get_next_number
604
605 1
606 1
def _get_tree(args, cwd, request_next_number=None, load=False):
607 1
    """Build a tree and optionally load all documents.
608
609
    :param args: Namespace of CLI arguments
610 1
    :param cwd: current working directory
611
    :param request_next_number: server method to get a document's next number
612
    :param load: force the early loading of all documents
613
614
    :return: built :class:`~doorstop.core.tree.Tree`
615
616
    """
617
    utilities.show("building tree...", flush=True)
618
    tree = build(cwd=cwd, root=args.project, request_next_number=request_next_number)
619
620
    if load:
621 1
        utilities.show("loading documents...", flush=True)
622 1
        tree.load()
623 1
624 1
    return tree
625
626
627
def _iter_items(args, tree, error):
628 1
    """Iterate through items.
629
630
    :param args: Namespace of CLI arguments
631 1
    :param tree: the document hierarchy tree
632 1
    :param error: function to call for CLI errors
633 1
634 1
    Items are filtered to:
635 1
636
    - `args.label` == 'all': all items
637 1
    - `args.label` == document prefix: the document's items
638 1
    - `args.label` == item UID: a single item
639 1
640
    Documents and items are inferred unless flagged by:
641 1
642 1
    - `args.document`: `args.label` is a prefix
643
    - `args.item`: `args.label` is an UID
644
645
    """
646
    # Parse arguments
647
    if args.label == 'all':
648
        if args.item:
649
            error("argument -i/--item: not allowed with 'all'")
650
        if args.document:
651
            error("argument -d/--document: not allowed with 'all'")
652
653
    # Build tree
654
    item = None
655
    document = None
656
657
    # Determine if tree, document, or item was requested
658
    if args.label != 'all':
659
        if not args.item:
660
            try:
661
                document = tree.find_document(args.label)
662
            except common.DoorstopError as exc:
663
                if args.document:
664
                    raise exc from None  # pylint: disable=raising-bad-type
665
        if not document:
666
            item = tree.find_item(args.label)
667
668
    # Yield items from the requested object
669
    if item:
670
        yield item
671
    elif document:
672
        for item in document:
673
            yield item
674
    else:
675
        for document in tree:
676
            for item in document:
677
                yield item
678
679
680
def _export_import(args, cwd, error, document, ext):
681
    """Edit a document by calling export followed by import.
682
683
    :param args: Namespace of CLI arguments
684
    :param cwd: current working directory
685
    :param error: function to call for CLI errors
686
    :param document: :class:`~doorstop.core.document.Document` to edit
687
    :param ext: extension for export format
688
689
    """
690
    # Export the document to file
691
    args.prefix = document.prefix
692
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
693
    args.path = path
694
    get('export')(args, cwd, error, catch=False, auto=True, _tree=document.tree)
695
696
    # Open the exported file
697
    editor.edit(path, tool=args.tool)
698
699
    # Import the file to the same document
700
    if utilities.ask("import from '{}'?".format(path)):
701
        args.attrs = {}
702
        args.map = {}
703
        get('import')(args, cwd, error, catch=False, _tree=document.tree)
704
        common.delete(path)
705
    else:
706
        utilities.show("import canceled")
707
        if utilities.ask("delete '{}'?".format(path)):
708
            common.delete(path)
709
        else:
710
            msg = "to manually import: doorstop import {0}".format(path)
711
            utilities.show(msg)
712