doorstop.cli.commands   F
last analyzed

Complexity

Total Complexity 117

Size/Duplication

Total Lines 708
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 117
eloc 342
dl 0
loc 708
rs 2
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A CycleTracker._dfs_visit() 0 24 5
A CycleTracker.__init__() 0 4 1
A CycleTracker.__call__() 0 12 3

19 Functions

Rating   Name   Duplication   Size   Complexity  
A get() 0 8 2
A run_review() 0 20 4
B run_clear() 0 30 6
A run_delete() 0 24 3
A run_create() 0 30 3
A run() 0 25 5
D run_import() 0 58 13
A run_unlink() 0 25 3
A _request_next_number() 0 8 2
A run_add() 0 33 5
C run_edit() 0 38 9
F _iter_items() 0 51 14
C run_publish() 0 53 10
A _export_import() 0 32 3
A _get_tree() 0 19 2
C run_reorder() 0 54 9
C run_export() 0 45 9
A run_link() 0 25 3
A run_remove() 0 23 3

How to fix   Complexity   

Complexity

Complex classes like doorstop.cli.commands often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Command functions."""
4
5
import os
6
import time
7
from typing import Set
8
9
from doorstop import common, server
10
from doorstop.cli import utilities
11
from doorstop.core import editor, exporter, importer, publisher
12
from doorstop.core.builder import build
13
14
log = common.logger(__name__)
15
16
17
class CycleTracker:
18
    """A cycle tracker to detect cyclic references between items.
19
20
    The cycle tracker uses a standard algorithm to detect cycles in a directed
21
    graph (not necessarily connected) using a depth first search with a bit of
22
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
23
    the items.  The edges are the links between items.
24
25
    """
26
27
    def __init__(self):
28
        """Initialize a cycle tracker."""
29
        self.discovered: Set[str] = set()
30
        self.finished: Set[str] = set()
31
32
    def _dfs_visit(self, uid, tree):
33
        """Do a depth first search visit of the specified item.
34
35
        :param uid: the UID of the item to visit
36
        :param tree: the document hierarchy tree
37
38
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
39
40
        """
41
        self.discovered.add(uid)
42
        item = tree.find_item(uid)
43
44
        for pid in item.links:
45
            # Detect cycles via a back edge
46
            if pid in self.discovered:
47
                msg = "detected a cycle with a back edge from {} to {}".format(pid, uid)
48
                yield common.DoorstopWarning(msg)
49
50
            # Recurse, if this a fresh item
51
            if pid not in self.discovered and pid not in self.finished:
52
                yield from self._dfs_visit(pid, tree)
53
54
        self.discovered.remove(uid)
55
        self.finished.add(uid)
56
57
    def __call__(self, item, document, tree):
58
        """Get cycles which include the specified item.
59
60
        :param item: the UID of the item to get the cycles for
61
        :param document: unused
62
        :param tree: the document hierarchy tree
63
64
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
65
66
        """
67
        if item not in self.discovered and item not in self.finished:
68
            yield from self._dfs_visit(item, tree)
69
70
71
def get(name):
72
    """Get a command function by name."""
73
    if name:
74
        log.debug("running command '{}'...".format(name))
75
        return globals()["run_" + name]
76
    else:
77
        log.debug("launching main command...")
78
        return run
79
80
81
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
82
    """Process arguments and run the `doorstop` subcommand.
83
84
    :param args: Namespace of CLI arguments
85
    :param cwd: current working directory
86
    :param error: function to call for CLI errors
87
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
88
89
    """
90
    with utilities.capture(catch=catch) as success:
91
        # get the tree
92
        tree = _get_tree(args, cwd, load=True)
93
94
        # validate it
95
        utilities.show("validating items...", flush=True)
96
        cycle_tracker = CycleTracker()
97
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
98
99
    if not success:
100
        return False
101
102
    if len(tree) > 1 and valid:
103
        utilities.show("\n" + tree.draw() + "\n")
104
105
    return valid
106
107
108
def run_create(args, cwd, _, catch=True):
109
    """Process arguments and run the `doorstop create` subcommand.
110
111
    :param args: Namespace of CLI arguments
112
    :param cwd: current working directory
113
    :param error: function to call for CLI errors
114
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
115
116
    """
117
    with utilities.capture(catch=catch) as success:
118
        # get the tree
119
        tree = _get_tree(args, cwd)
120
121
        # create a new document
122
        document = tree.create_document(
123
            args.path,
124
            args.prefix,
125
            parent=args.parent,
126
            digits=args.digits,
127
            sep=args.separator,
128
            itemformat=args.itemformat,
129
        )
130
131
    if not success:
132
        return False
133
134
    utilities.show(
135
        "created document: {} ({})".format(document.prefix, document.relpath)
136
    )
137
    return True
138
139
140
def run_delete(args, cwd, _, catch=True):
141
    """Process arguments and run the `doorstop delete` subcommand.
142
143
    :param args: Namespace of CLI arguments
144
    :param cwd: current working directory
145
    :param error: function to call for CLI errors
146
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
147
148
    """
149
    with utilities.capture(catch=catch) as success:
150
        # get the document
151
        tree = _get_tree(args, cwd)
152
        document = tree.find_document(args.prefix)
153
154
        # delete it
155
        prefix, relpath = document.prefix, document.relpath
156
        document.delete()
157
158
    if not success:
159
        return False
160
161
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
162
163
    return True
164
165
166
def run_add(args, cwd, _, catch=True):
167
    """Process arguments and run the `doorstop add` subcommand.
168
169
    :param args: Namespace of CLI arguments
170
    :param cwd: current working directory
171
    :param error: function to call for CLI errors
172
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
173
174
    """
175
    with utilities.capture(catch=catch) as success:
176
        # get the document
177
        request_next_number = _request_next_number(args)
178
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
179
        document = tree.find_document(args.prefix)
180
181
        # add items to it
182
        for _ in range(args.count):
183
            item = document.add_item(
184
                level=args.level,
185
                defaults=args.defaults,
186
                name=args.name,
187
                reorder=args.noreorder,
188
            )
189
            utilities.show("added item: {} ({})".format(item.uid, item.relpath))
190
191
        # Edit item if requested
192
        if args.edit:
193
            item.edit(tool=args.tool)
0 ignored issues
show
introduced by
The variable item does not seem to be defined in case the for loop on line 182 is not entered. Are you sure this can never be the case?
Loading history...
194
195
    if not success:
196
        return False
197
198
    return True
199
200
201
def run_remove(args, cwd, _, catch=True):
202
    """Process arguments and run the `doorstop remove` subcommand.
203
204
    :param args: Namespace of CLI arguments
205
    :param cwd: current working directory
206
    :param error: function to call for CLI errors
207
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
208
209
    """
210
    with utilities.capture(catch=catch) as success:
211
        # get the item
212
        tree = _get_tree(args, cwd)
213
        item = tree.find_item(args.uid)
214
215
        # delete it
216
        item.delete()
217
218
    if not success:
219
        return False
220
221
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
222
223
    return True
224
225
226
def run_edit(args, cwd, error, catch=True):
227
    """Process arguments and run the `doorstop edit` subcommand.
228
229
    :param args: Namespace of CLI arguments
230
    :param cwd: current working directory
231
    :param error: function to call for CLI errors
232
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
233
234
    """
235
    item = document = None
236
    ext = utilities.get_ext(args, error, ".yml", ".yml", whole_tree=False)
237
238
    with utilities.capture(catch=catch) as success:
239
        # get the item or document
240
        request_next_number = _request_next_number(args)
241
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
242
        if not args.document:
243
            try:
244
                item = tree.find_item(args.label)
245
            except common.DoorstopError as exc:
246
                if args.item:
247
                    raise exc from None  # pylint: disable=raising-bad-type
248
        if not item:
249
            document = tree.find_document(args.label)
250
251
        # edit it
252
        if item:
253
            item.edit(tool=args.tool, edit_all=args.all)
254
        else:
255
            _export_import(args, cwd, error, document, ext)
256
257
    if not success:
258
        return False
259
260
    if item:
261
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
262
263
    return True
264
265
266
def run_reorder(args, cwd, error, catch=True, _tree=None):
267
    """Process arguments and run the `doorstop reorder` subcommand.
268
269
    :param args: Namespace of CLI arguments
270
    :param cwd: current working directory
271
    :param error: function to call for CLI errors
272
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
273
274
    """
275
    reordered = False
276
277
    with utilities.capture(catch=catch) as success:
278
        # get the document
279
        tree = _tree or _get_tree(args, cwd)
280
        document = tree.find_document(args.prefix)
281
282
    if not success:
283
        return False
284
285
    with utilities.capture(catch=catch) as success:
286
        # automatically order
287
        if args.auto:
288
            msg = "reordering document {}...".format(document)
289
            utilities.show(msg, flush=True)
290
            document.reorder(manual=False)
291
            reordered = True
292
293
        # or, reorder from a previously updated index
294
        elif document.index:
295
            relpath = os.path.relpath(document.index, cwd)
296
            if utilities.ask("reorder from '{}'?".format(relpath)):
297
                msg = "reordering document {}...".format(document)
298
                utilities.show(msg, flush=True)
299
                document.reorder(automatic=not args.manual)
300
                reordered = True
301
            else:
302
                del document.index
303
304
        # or, create a new index to update
305
        else:
306
            document.index = True  # create index
307
            relpath = os.path.relpath(document.index, cwd)
308
            editor.edit(relpath, tool=args.tool)
309
            get("reorder")(args, cwd, error, catch=False, _tree=tree)
310
311
    if not success:
312
        msg = "after fixing the error: doorstop reorder {}".format(document)
313
        utilities.show(msg)
314
        return False
315
316
    if reordered:
317
        utilities.show("reordered document: {}".format(document))
318
319
    return True
320
321
322
def run_link(args, cwd, _, catch=True):
323
    """Process arguments and run the `doorstop link` subcommand.
324
325
    :param args: Namespace of CLI arguments
326
    :param cwd: current working directory
327
    :param error: function to call for CLI errors
328
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
329
330
    """
331
    with utilities.capture(catch=catch) as success:
332
        # get the tree
333
        tree = _get_tree(args, cwd)
334
335
        # link items
336
        child, parent = tree.link_items(args.child, args.parent)
337
338
    if not success:
339
        return False
340
341
    msg = "linked items: {} ({}) -> {} ({})".format(
342
        child.uid, child.relpath, parent.uid, parent.relpath
343
    )
344
    utilities.show(msg)
345
346
    return True
347
348
349
def run_unlink(args, cwd, _, catch=True):
350
    """Process arguments and run the `doorstop unlink` subcommand.
351
352
    :param args: Namespace of CLI arguments
353
    :param cwd: current working directory
354
    :param error: function to call for CLI errors
355
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
356
357
    """
358
    with utilities.capture(catch=catch) as success:
359
        # get the tree
360
        tree = _get_tree(args, cwd)
361
362
        # unlink items
363
        child, parent = tree.unlink_items(args.child, args.parent)
364
365
    if not success:
366
        return False
367
368
    msg = "unlinked items: {} ({}) -> {} ({})".format(
369
        child.uid, child.relpath, parent.uid, parent.relpath
370
    )
371
    utilities.show(msg)
372
373
    return True
374
375
376
def run_clear(args, cwd, error, catch=True):
377
    """Process arguments and run the `doorstop clear` subcommand.
378
379
    :param args: Namespace of CLI arguments
380
    :param cwd: current working directory
381
    :param error: function to call for CLI errors
382
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
383
384
    """
385
    with utilities.capture(catch=catch) as success:
386
        tree = _get_tree(args, cwd)
387
388
        if args.parents:
389
            # Check that the parent item UIDs exist
390
            for pid in args.parents:
391
                tree.find_item(pid)
392
393
            pids = " to " + ", ".join(args.parents)
394
        else:
395
            pids = ""
396
397
        for item in _iter_items(args, tree, error):
398
            msg = "clearing item {}'s suspect links{}...".format(item.uid, pids)
399
            utilities.show(msg)
400
            item.clear(parents=args.parents)
401
402
    if not success:
403
        return False
404
405
    return True
406
407
408
def run_review(args, cwd, error, catch=True):
409
    """Process arguments and run the `doorstop review` subcommand.
410
411
    :param args: Namespace of CLI arguments
412
    :param cwd: current working directory
413
    :param error: function to call for CLI errors
414
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
415
416
    """
417
    with utilities.capture(catch=catch) as success:
418
        tree = _get_tree(args, cwd)
419
420
        for item in _iter_items(args, tree, error):
421
            utilities.show("marking item {} as reviewed...".format(item.uid))
422
            item.review()
423
424
    if not success:
425
        return False
426
427
    return True
428
429
430
def run_import(args, cwd, error, catch=True, _tree=None):
431
    """Process arguments and run the `doorstop import` subcommand.
432
433
    :param args: Namespace of CLI arguments
434
    :param cwd: current working directory
435
    :param error: function to call for CLI errors
436
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
437
438
    """
439
    document = item = None
440
    attrs = utilities.literal_eval(args.attrs, error)
441
    mapping = utilities.literal_eval(args.map, error)
442
    if args.path:
443
        if not args.prefix:
444
            error("when [path] specified, [prefix] is also required")
445
        elif args.document:
446
            error("'--document' cannot be used with [path] [prefix]")
447
        elif args.item:
448
            error("'--item' cannot be used with [path] [prefix]")
449
        ext = utilities.get_ext(args, error, None, None)
450
    elif not (args.document or args.item):
451
        error("specify [path], '--document', or '--item' to import")
452
453
    with utilities.capture(catch=catch) as success:
454
        if args.path:
455
            # get the document
456
            request_next_number = _request_next_number(args)
457
            tree = _tree or _get_tree(
458
                args, cwd, request_next_number=request_next_number
459
            )
460
            document = tree.find_document(args.prefix)
461
462
            # import items into it
463
            msg = "importing '{}' into document {}...".format(args.path, document)
464
            utilities.show(msg, flush=True)
465
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
introduced by
The variable ext does not seem to be defined for all execution paths.
Loading history...
466
467
        elif args.document:
468
            prefix, path = args.document
469
            document = importer.create_document(prefix, path, parent=args.parent)
470
        elif args.item:
471
            prefix, uid = args.item
472
            request_next_number = _request_next_number(args)
473
            item = importer.add_item(
474
                prefix, uid, attrs=attrs, request_next_number=request_next_number
475
            )
476
    if not success:
477
        return False
478
479
    if document:
480
        utilities.show(
481
            "imported document: {} ({})".format(document.prefix, document.relpath)
482
        )
483
    else:
484
        assert item
485
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
486
487
    return True
488
489
490
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
491
    """Process arguments and run the `doorstop export` subcommand.
492
493
    :param args: Namespace of CLI arguments
494
    :param cwd: current working directory
495
    :param error: function to call for CLI errors
496
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
497
498
    :param auto: include placeholders for new items on import
499
500
    """
501
    whole_tree = args.prefix == "all"
502
    ext = utilities.get_ext(args, error, ".yml", ".csv", whole_tree=whole_tree)
503
504
    # Get the tree or document
505
    with utilities.capture(catch=catch) as success:
506
        exporter.check(ext)
507
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
508
        if not whole_tree:
509
            document = tree.find_document(args.prefix)
510
511
    if not success:
512
        return False
513
514
    # Write to output file(s)
515
    if args.path:
516
        if whole_tree:
517
            msg = "exporting tree to '{}'...".format(args.path)
518
            utilities.show(msg, flush=True)
519
            path = exporter.export(tree, args.path, ext, auto=auto)
520
        else:
521
            msg = "exporting document {} to '{}'...".format(document, args.path)
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 508 is False. Are you sure this can never be the case?
Loading history...
522
            utilities.show(msg, flush=True)
523
            path = exporter.export(document, args.path, ext, auto=auto)
524
        if path:
525
            utilities.show("exported: {}".format(path))
526
527
    # Or, display to standard output
528
    else:
529
        if whole_tree:
530
            error("only single documents can be displayed")
531
        for line in exporter.export_lines(document, ext):
532
            utilities.show(line)
533
534
    return True
535
536
537
def run_publish(args, cwd, error, catch=True):
538
    """Process arguments and run the `doorstop publish` subcommand.
539
540
    :param args: Namespace of CLI arguments
541
    :param cwd: current working directory
542
    :param error: function to call for CLI errors
543
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
544
545
    """
546
    whole_tree = args.prefix == "all"
547
    ext = utilities.get_ext(args, error, ".txt", ".html", whole_tree)
548
549
    # Get the tree or document
550
    with utilities.capture(catch=catch) as success:
551
        publisher.check(ext)
552
        tree = _get_tree(args, cwd, load=whole_tree)
553
        if not whole_tree:
554
            document = tree.find_document(args.prefix)
555
556
    if not success:
557
        return False
558
559
    # Set publishing arguments
560
    kwargs = {}
561
    if args.width:
562
        kwargs["width"] = args.width
563
564
    # Write to output file(s)
565
    if args.path:
566
        path = os.path.abspath(os.path.join(cwd, args.path))
567
        if whole_tree:
568
            msg = "publishing tree to '{}'...".format(path)
569
            utilities.show(msg, flush=True)
570
            published_path = publisher.publish(
571
                tree, path, ext, template=args.template, **kwargs
572
            )
573
        else:
574
            msg = "publishing document {} to '{}'...".format(document, path)
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 553 is False. Are you sure this can never be the case?
Loading history...
575
            utilities.show(msg, flush=True)
576
            published_path = publisher.publish(
577
                document, path, ext, template=args.template, **kwargs
578
            )
579
        if published_path:
580
            utilities.show("published: {}".format(published_path))
581
582
    # Or, display to standard output
583
    else:
584
        if whole_tree:
585
            error("only single documents can be displayed")
586
        for line in publisher.publish_lines(document, ext, **kwargs):
587
            utilities.show(line)
588
589
    return True
590
591
592
def _request_next_number(args):
593
    """Get the server's "next number" method if a server exists."""
594
    if args.force:
595
        log.warning("creating items without the server...")
596
        return None
597
    else:
598
        server.check()
599
        return server.get_next_number
600
601
602
def _get_tree(args, cwd, request_next_number=None, load=False):
603
    """Build a tree and optionally load all documents.
604
605
    :param args: Namespace of CLI arguments
606
    :param cwd: current working directory
607
    :param request_next_number: server method to get a document's next number
608
    :param load: force the early loading of all documents
609
610
    :return: built :class:`~doorstop.core.tree.Tree`
611
612
    """
613
    utilities.show("building tree...", flush=True)
614
    tree = build(cwd=cwd, root=args.project, request_next_number=request_next_number)
615
616
    if load:
617
        utilities.show("loading documents...", flush=True)
618
        tree.load()
619
620
    return tree
621
622
623
def _iter_items(args, tree, error):
624
    """Iterate through items.
625
626
    :param args: Namespace of CLI arguments
627
    :param tree: the document hierarchy tree
628
    :param error: function to call for CLI errors
629
630
    Items are filtered to:
631
632
    - `args.label` == 'all': all items
633
    - `args.label` == document prefix: the document's items
634
    - `args.label` == item UID: a single item
635
636
    Documents and items are inferred unless flagged by:
637
638
    - `args.document`: `args.label` is a prefix
639
    - `args.item`: `args.label` is an UID
640
641
    """
642
    # Parse arguments
643
    if args.label == "all":
644
        if args.item:
645
            error("argument -i/--item: not allowed with 'all'")
646
        if args.document:
647
            error("argument -d/--document: not allowed with 'all'")
648
649
    # Build tree
650
    item = None
651
    document = None
652
653
    # Determine if tree, document, or item was requested
654
    if args.label != "all":
655
        if not args.item:
656
            try:
657
                document = tree.find_document(args.label)
658
            except common.DoorstopError as exc:
659
                if args.document:
660
                    raise exc from None  # pylint: disable=raising-bad-type
661
        if not document:
662
            item = tree.find_item(args.label)
663
664
    # Yield items from the requested object
665
    if item:
666
        yield item
667
    elif document:
668
        for item in document:
669
            yield item
670
    else:
671
        for document in tree:
672
            for item in document:
673
                yield item
674
675
676
def _export_import(args, cwd, error, document, ext):
677
    """Edit a document by calling export followed by import.
678
679
    :param args: Namespace of CLI arguments
680
    :param cwd: current working directory
681
    :param error: function to call for CLI errors
682
    :param document: :class:`~doorstop.core.document.Document` to edit
683
    :param ext: extension for export format
684
685
    """
686
    # Export the document to file
687
    args.prefix = document.prefix
688
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
689
    args.path = path
690
    get("export")(args, cwd, error, catch=False, auto=True, _tree=document.tree)
691
692
    # Open the exported file
693
    editor.edit(path, tool=args.tool)
694
695
    # Import the file to the same document
696
    if utilities.ask("import from '{}'?".format(path)):
697
        args.attrs = {}
698
        args.map = {}
699
        get("import")(args, cwd, error, catch=False, _tree=document.tree)
700
        common.delete(path)
701
    else:
702
        utilities.show("import canceled")
703
        if utilities.ask("delete '{}'?".format(path)):
704
            common.delete(path)
705
        else:
706
            msg = "to manually import: doorstop import {0}".format(path)
707
            utilities.show(msg)
708