Passed
Push — develop ( 7865ed...73032f )
by Jace
03:54 queued 15s
created

doorstop.cli.commands._iter_items()   D

Complexity

Conditions 12

Size

Total Lines 49
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 49
rs 4.8
c 0
b 0
f 0
cc 12
nop 3

How to fix   Complexity   

Complexity

Complex classes like doorstop.cli.commands._iter_items() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Command functions."""
4
5
import os
6
import time
7
from typing import Set
8
9
from doorstop import common, server
10
from doorstop.cli import utilities
11
from doorstop.core import editor, exporter, importer, publisher
12
from doorstop.core.builder import build
13
14
log = common.logger(__name__)
15
16
17
class CycleTracker:
18
    """A cycle tracker to detect cyclic references between items.
19
20
    The cycle tracker uses a standard algorithm to detect cycles in a directed
21
    graph (not necessarily connected) using a depth first search with a bit of
22
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
23
    the items.  The edges are the links between items.
24
25
    """
26
27
    def __init__(self):
28
        """Initialize a cycle tracker."""
29
        self.discovered: Set[str] = set()
30
        self.finished: Set[str] = set()
31
32
    def _dfs_visit(self, uid, tree):
33
        """Do a depth first search visit of the specified item.
34
35
        :param uid: the UID of the item to visit
36
        :param tree: the document hierarchy tree
37
38
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
39
40
        """
41
        self.discovered.add(uid)
42
        item = tree.find_item(uid)
43
44
        for pid in item.links:
45
            # Detect cycles via a back edge
46
            if pid in self.discovered:
47
                msg = "detected a cycle with a back edge from {} to {}".format(pid, uid)
48
                yield common.DoorstopWarning(msg)
49
50
            # Recurse, if this a fresh item
51
            if pid not in self.discovered and pid not in self.finished:
52
                yield from self._dfs_visit(pid, tree)
53
54
        self.discovered.remove(uid)
55
        self.finished.add(uid)
56
57
    def __call__(self, item, document, tree):
58
        """Get cycles which include the specified item.
59
60
        :param item: the UID of the item to get the cycles for
61
        :param document: unused
62
        :param tree: the document hierarchy tree
63
64
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
65
66
        """
67
        if item not in self.discovered and item not in self.finished:
68
            yield from self._dfs_visit(item, tree)
69
70
71
def get(name):
72
    """Get a command function by name."""
73
    if name:
74
        log.debug("running command '{}'...".format(name))
75
        return globals()["run_" + name]
76
    else:
77
        log.debug("launching main command...")
78
        return run
79
80
81
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
82
    """Process arguments and run the `doorstop` subcommand.
83
84
    :param args: Namespace of CLI arguments
85
    :param cwd: current working directory
86
    :param error: function to call for CLI errors
87
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
88
89
    """
90
    with utilities.capture(catch=catch) as success:
91
        # get the tree
92
        tree = _get_tree(args, cwd, load=True)
93
94
        # validate it
95
        utilities.show("validating items...", flush=True)
96
        cycle_tracker = CycleTracker()
97
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
98
99
    if not success:
100
        return False
101
102
    if len(tree) > 1 and valid:
103
        utilities.show("\n" + tree.draw() + "\n")
104
105
    return valid
106
107
108
def run_create(args, cwd, _, catch=True):
109
    """Process arguments and run the `doorstop create` subcommand.
110
111
    :param args: Namespace of CLI arguments
112
    :param cwd: current working directory
113
    :param error: function to call for CLI errors
114
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
115
116
    """
117
    with utilities.capture(catch=catch) as success:
118
        # get the tree
119
        tree = _get_tree(args, cwd)
120
121
        # create a new document
122
        document = tree.create_document(
123
            args.path,
124
            args.prefix,
125
            parent=args.parent,
126
            digits=args.digits,
127
            sep=args.separator,
128
            itemformat=args.itemformat,
129
        )
130
131
    if not success:
132
        return False
133
134
    utilities.show(
135
        "created document: {} ({})".format(document.prefix, document.relpath)
136
    )
137
    return True
138
139
140
def run_delete(args, cwd, _, catch=True):
141
    """Process arguments and run the `doorstop delete` subcommand.
142
143
    :param args: Namespace of CLI arguments
144
    :param cwd: current working directory
145
    :param error: function to call for CLI errors
146
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
147
148
    """
149
    with utilities.capture(catch=catch) as success:
150
        # get the document
151
        tree = _get_tree(args, cwd)
152
        document = tree.find_document(args.prefix)
153
154
        # delete it
155
        prefix, relpath = document.prefix, document.relpath
156
        document.delete()
157
158
    if not success:
159
        return False
160
161
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
162
163
    return True
164
165
166
def run_add(args, cwd, _, catch=True):
167
    """Process arguments and run the `doorstop add` subcommand.
168
169
    :param args: Namespace of CLI arguments
170
    :param cwd: current working directory
171
    :param error: function to call for CLI errors
172
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
173
174
    """
175
    with utilities.capture(catch=catch) as success:
176
        # get the document
177
        request_next_number = _request_next_number(args)
178
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
179
        document = tree.find_document(args.prefix)
180
181
        # add items to it
182
        for _ in range(args.count):
183
            item = document.add_item(
184
                level=args.level,
185
                defaults=args.defaults,
186
                name=args.name,
187
                reorder=args.noreorder,
188
            )
189
            utilities.show("added item: {} ({})".format(item.uid, item.relpath))
190
191
        # Edit item if requested
192
        if args.edit:
193
            item.edit(tool=args.tool)
0 ignored issues
show
introduced by
The variable item does not seem to be defined in case the for loop on line 182 is not entered. Are you sure this can never be the case?
Loading history...
194
195
    if not success:
196
        return False
197
198
    return True
199
200
201
def run_remove(args, cwd, _, catch=True):
202
    """Process arguments and run the `doorstop remove` subcommand.
203
204
    :param args: Namespace of CLI arguments
205
    :param cwd: current working directory
206
    :param error: function to call for CLI errors
207
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
208
209
    """
210
    with utilities.capture(catch=catch) as success:
211
        # get the item
212
        tree = _get_tree(args, cwd)
213
        item = tree.find_item(args.uid)
214
215
        # delete it
216
        item.delete()
217
218
    if not success:
219
        return False
220
221
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
222
223
    return True
224
225
226
def run_edit(args, cwd, error, catch=True):
227
    """Process arguments and run the `doorstop edit` subcommand.
228
229
    :param args: Namespace of CLI arguments
230
    :param cwd: current working directory
231
    :param error: function to call for CLI errors
232
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
233
234
    """
235
    item = document = None
236
    ext = utilities.get_ext(args, error, ".yml", ".yml", whole_tree=False)
237
238
    with utilities.capture(catch=catch) as success:
239
        # get the item or document
240
        request_next_number = _request_next_number(args)
241
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
242
        if not args.document:
243
            try:
244
                item = tree.find_item(args.label)
245
            except common.DoorstopError as exc:
246
                if args.item:
247
                    raise exc from None  # pylint: disable=raising-bad-type
248
        if not item:
249
            document = tree.find_document(args.label)
250
251
        # edit it
252
        if item:
253
            item.edit(tool=args.tool, edit_all=args.all)
254
        else:
255
            _export_import(args, cwd, error, document, ext)
256
257
    if not success:
258
        return False
259
260
    if item:
261
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
262
263
    return True
264
265
266
def run_reorder(args, cwd, error, catch=True, _tree=None):
267
    """Process arguments and run the `doorstop reorder` subcommand.
268
269
    :param args: Namespace of CLI arguments
270
    :param cwd: current working directory
271
    :param error: function to call for CLI errors
272
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
273
274
    """
275
    reordered = False
276
277
    with utilities.capture(catch=catch) as success:
278
        # get the document
279
        tree = _tree or _get_tree(args, cwd)
280
        document = tree.find_document(args.prefix)
281
282
    if not success:
283
        return False
284
285
    with utilities.capture(catch=catch) as success:
286
        # automatically order
287
        if args.auto:
288
            msg = "reordering document {}...".format(document)
289
            utilities.show(msg, flush=True)
290
            document.reorder(manual=False)
291
            reordered = True
292
293
        # or, reorder from a previously updated index
294
        elif document.index:
295
            relpath = os.path.relpath(document.index, cwd)
296
            if utilities.ask("reorder from '{}'?".format(relpath)):
297
                msg = "reordering document {}...".format(document)
298
                utilities.show(msg, flush=True)
299
                document.reorder(automatic=not args.manual)
300
                reordered = True
301
            else:
302
                del document.index
303
304
        # or, create a new index to update
305
        else:
306
            document.index = True  # create index
307
            relpath = os.path.relpath(document.index, cwd)
308
            editor.edit(relpath, tool=args.tool)
309
            get("reorder")(args, cwd, error, catch=False, _tree=tree)
310
311
    if not success:
312
        msg = "after fixing the error: doorstop reorder {}".format(document)
313
        utilities.show(msg)
314
        return False
315
316
    if reordered:
317
        utilities.show("reordered document: {}".format(document))
318
319
    return True
320
321
322
def run_link(args, cwd, _, catch=True):
323
    """Process arguments and run the `doorstop link` subcommand.
324
325
    :param args: Namespace of CLI arguments
326
    :param cwd: current working directory
327
    :param error: function to call for CLI errors
328
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
329
330
    """
331
    with utilities.capture(catch=catch) as success:
332
        # get the tree
333
        tree = _get_tree(args, cwd)
334
335
        # link items
336
        child, parent = tree.link_items(args.child, args.parent)
337
338
    if not success:
339
        return False
340
341
    msg = "linked items: {} ({}) -> {} ({})".format(
342
        child.uid, child.relpath, parent.uid, parent.relpath
343
    )
344
    utilities.show(msg)
345
346
    return True
347
348
349
def run_unlink(args, cwd, _, catch=True):
350
    """Process arguments and run the `doorstop unlink` subcommand.
351
352
    :param args: Namespace of CLI arguments
353
    :param cwd: current working directory
354
    :param error: function to call for CLI errors
355
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
356
357
    """
358
    with utilities.capture(catch=catch) as success:
359
        # get the tree
360
        tree = _get_tree(args, cwd)
361
362
        # unlink items
363
        child, parent = tree.unlink_items(args.child, args.parent)
364
365
    if not success:
366
        return False
367
368
    msg = "unlinked items: {} ({}) -> {} ({})".format(
369
        child.uid, child.relpath, parent.uid, parent.relpath
370
    )
371
    utilities.show(msg)
372
373
    return True
374
375
376
def run_clear(args, cwd, error, catch=True):
377
    """Process arguments and run the `doorstop clear` subcommand.
378
379
    :param args: Namespace of CLI arguments
380
    :param cwd: current working directory
381
    :param error: function to call for CLI errors
382
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
383
384
    """
385
    with utilities.capture(catch=catch) as success:
386
        tree = _get_tree(args, cwd)
387
388
        if args.parents:
389
            # Check that the parent item UIDs exist
390
            for pid in args.parents:
391
                tree.find_item(pid)
392
393
            pids = " to " + ", ".join(args.parents)
394
        else:
395
            pids = ""
396
397
        for item in _iter_items(args, tree, error):
398
            msg = "clearing item {}'s suspect links{}...".format(item.uid, pids)
399
            utilities.show(msg)
400
            item.clear(parents=args.parents)
401
402
    if not success:
403
        return False
404
405
    return True
406
407
408
def run_review(args, cwd, error, catch=True):
409
    """Process arguments and run the `doorstop review` subcommand.
410
411
    :param args: Namespace of CLI arguments
412
    :param cwd: current working directory
413
    :param error: function to call for CLI errors
414
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
415
416
    """
417
    with utilities.capture(catch=catch) as success:
418
        tree = _get_tree(args, cwd)
419
420
        for item in _iter_items(args, tree, error):
421
            utilities.show("marking item {} as reviewed...".format(item.uid))
422
            item.review()
423
424
    if not success:
425
        return False
426
427
    return True
428
429
430
def run_import(args, cwd, error, catch=True, _tree=None):
431
    """Process arguments and run the `doorstop import` subcommand.
432
433
    :param args: Namespace of CLI arguments
434
    :param cwd: current working directory
435
    :param error: function to call for CLI errors
436
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
437
438
    """
439
    document = item = None
440
    attrs = utilities.literal_eval(args.attrs, error)
441
    mapping = utilities.literal_eval(args.map, error)
442
    if args.path:
443
        if not args.prefix:
444
            error("when [path] specified, [prefix] is also required")
445
        elif args.document:
446
            error("'--document' cannot be used with [path] [prefix]")
447
        elif args.item:
448
            error("'--item' cannot be used with [path] [prefix]")
449
        ext = utilities.get_ext(args, error, None, None)
450
    elif not (args.document or args.item):
451
        error("specify [path], '--document', or '--item' to import")
452
453
    with utilities.capture(catch=catch) as success:
454
        if args.path:
455
            # get the document
456
            request_next_number = _request_next_number(args)
457
            tree = _tree or _get_tree(
458
                args, cwd, request_next_number=request_next_number
459
            )
460
            document = tree.find_document(args.prefix)
461
462
            # import items into it
463
            msg = "importing '{}' into document {}...".format(args.path, document)
464
            utilities.show(msg, flush=True)
465
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
introduced by
The variable ext does not seem to be defined for all execution paths.
Loading history...
466
467
        elif args.document:
468
            prefix, path = args.document
469
            document = importer.create_document(prefix, path, parent=args.parent)
470
        elif args.item:
471
            prefix, uid = args.item
472
            request_next_number = _request_next_number(args)
473
            item = importer.add_item(
474
                prefix, uid, attrs=attrs, request_next_number=request_next_number
475
            )
476
    if not success:
477
        return False
478
479
    if document:
480
        utilities.show(
481
            "imported document: {} ({})".format(document.prefix, document.relpath)
482
        )
483
    else:
484
        assert item
485
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
486
487
    return True
488
489
490
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
491
    """Process arguments and run the `doorstop export` subcommand.
492
493
    :param args: Namespace of CLI arguments
494
    :param cwd: current working directory
495
    :param error: function to call for CLI errors
496
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
497
498
    :param auto: include placeholders for new items on import
499
500
    """
501
    whole_tree = args.prefix == "all"
502
    ext = utilities.get_ext(args, error, ".yml", ".csv", whole_tree=whole_tree)
503
504
    # Get the tree or document
505
    document = None
506
507
    with utilities.capture(catch=catch) as success:
508
        exporter.check(ext)
509
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
510
        if not whole_tree:
511
            document = tree.find_document(args.prefix)
512
513
    if not success:
514
        return False
515
516
    # Write to output file(s)
517
    if args.path:
518
        if whole_tree:
519
            msg = "exporting tree to '{}'...".format(args.path)
520
            utilities.show(msg, flush=True)
521
            path = exporter.export(tree, args.path, ext, auto=auto)
522
        else:
523
            msg = "exporting document {} to '{}'...".format(document, args.path)
524
            utilities.show(msg, flush=True)
525
            path = exporter.export(document, args.path, ext, auto=auto)
526
        if path:
527
            utilities.show("exported: {}".format(path))
528
529
    # Or, display to standard output
530
    else:
531
        if whole_tree:
532
            error("only single documents can be displayed")
533
        for line in exporter.export_lines(document, ext):
534
            utilities.show(line)
535
536
    return True
537
538
539
def run_publish(args, cwd, error, catch=True):
540
    """Process arguments and run the `doorstop publish` subcommand.
541
542
    :param args: Namespace of CLI arguments
543
    :param cwd: current working directory
544
    :param error: function to call for CLI errors
545
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
546
547
    """
548
    whole_tree = args.prefix == "all"
549
    ext = utilities.get_ext(args, error, ".txt", ".html", whole_tree)
550
551
    # Get the tree or document
552
    document = None
553
    with utilities.capture(catch=catch) as success:
554
        publisher.check(ext)
555
        tree = _get_tree(args, cwd, load=whole_tree)
556
        if not whole_tree:
557
            document = tree.find_document(args.prefix)
558
559
    if not success:
560
        return False
561
562
    # Set publishing arguments
563
    kwargs = {}
564
    if args.width:
565
        kwargs["width"] = args.width
566
567
    # Write to output file(s)
568
    if args.path:
569
        path = os.path.abspath(os.path.join(cwd, args.path))
570
        if whole_tree:
571
            msg = "publishing tree to '{}'...".format(path)
572
            utilities.show(msg, flush=True)
573
            published_path = publisher.publish(
574
                tree, path, ext, template=args.template, **kwargs
575
            )
576
        else:
577
            msg = "publishing document {} to '{}'...".format(document, path)
578
            utilities.show(msg, flush=True)
579
            published_path = publisher.publish(
580
                document, path, ext, template=args.template, **kwargs
581
            )
582
        if published_path:
583
            utilities.show("published: {}".format(published_path))
584
585
    # Or, display to standard output
586
    else:
587
        if whole_tree:
588
            error("only single documents can be displayed")
589
        for line in publisher.publish_lines(document, ext, **kwargs):
590
            utilities.show(line)
591
592
    return True
593
594
595
def _request_next_number(args):
596
    """Get the server's "next number" method if a server exists."""
597
    if args.force:
598
        log.warning("creating items without the server...")
599
        return None
600
    else:
601
        server.check()
602
        return server.get_next_number
603
604
605
def _get_tree(args, cwd, request_next_number=None, load=False):
606
    """Build a tree and optionally load all documents.
607
608
    :param args: Namespace of CLI arguments
609
    :param cwd: current working directory
610
    :param request_next_number: server method to get a document's next number
611
    :param load: force the early loading of all documents
612
613
    :return: built :class:`~doorstop.core.tree.Tree`
614
615
    """
616
    utilities.show("building tree...", flush=True)
617
    tree = build(cwd=cwd, root=args.project, request_next_number=request_next_number)
618
619
    if load:
620
        utilities.show("loading documents...", flush=True)
621
        tree.load()
622
623
    return tree
624
625
626
def _iter_items(args, tree, error):
627
    """Iterate through items.
628
629
    :param args: Namespace of CLI arguments
630
    :param tree: the document hierarchy tree
631
    :param error: function to call for CLI errors
632
633
    Items are filtered to:
634
635
    - `args.label` == 'all': all items
636
    - `args.label` == document prefix: the document's items
637
    - `args.label` == item UID: a single item
638
639
    Documents and items are inferred unless flagged by:
640
641
    - `args.document`: `args.label` is a prefix
642
    - `args.item`: `args.label` is an UID
643
644
    """
645
    # Parse arguments
646
    if args.label == "all":
647
        if args.item:
648
            error("argument -i/--item: not allowed with 'all'")
649
        if args.document:
650
            error("argument -d/--document: not allowed with 'all'")
651
652
    # Build tree
653
    item = None
654
    document = None
655
656
    # Determine if tree, document, or item was requested
657
    if args.label != "all":
658
        if not args.item:
659
            try:
660
                document = tree.find_document(args.label)
661
            except common.DoorstopError as exc:
662
                if args.document:
663
                    raise exc from None  # pylint: disable=raising-bad-type
664
        if not document:
665
            item = tree.find_item(args.label)
666
667
    # Yield items from the requested object
668
    if item:
669
        yield item
670
    elif document:
671
        yield from document
672
    else:
673
        for document in tree:
674
            yield from document
675
676
677
def _export_import(args, cwd, error, document, ext):
678
    """Edit a document by calling export followed by import.
679
680
    :param args: Namespace of CLI arguments
681
    :param cwd: current working directory
682
    :param error: function to call for CLI errors
683
    :param document: :class:`~doorstop.core.document.Document` to edit
684
    :param ext: extension for export format
685
686
    """
687
    # Export the document to file
688
    args.prefix = document.prefix
689
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
690
    args.path = path
691
    get("export")(args, cwd, error, catch=False, auto=True, _tree=document.tree)
692
693
    # Open the exported file
694
    editor.edit(path, tool=args.tool)
695
696
    # Import the file to the same document
697
    if utilities.ask("import from '{}'?".format(path)):
698
        args.attrs = {}
699
        args.map = {}
700
        get("import")(args, cwd, error, catch=False, _tree=document.tree)
701
        common.delete(path)
702
    else:
703
        utilities.show("import canceled")
704
        if utilities.ask("delete '{}'?".format(path)):
705
            common.delete(path)
706
        else:
707
            msg = "to manually import: doorstop import {0}".format(path)
708
            utilities.show(msg)
709