doorstop.cli.commands.CycleTracker.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Command functions."""
4
5
import os
6
import time
7
from typing import Set
8
9
from doorstop import common, server
10
from doorstop.cli import utilities
11
from doorstop.core import editor, exporter, importer, publisher
12
from doorstop.core.builder import build
13
14
log = common.logger(__name__)
15
16
17
class CycleTracker:
18
    """A cycle tracker to detect cyclic references between items.
19
20
    The cycle tracker uses a standard algorithm to detect cycles in a directed
21
    graph (not necessarily connected) using a depth first search with a bit of
22
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
23
    the items.  The edges are the links between items.
24
25
    """
26
27
    def __init__(self):
28
        """Initialize a cycle tracker."""
29
        self.discovered: Set[str] = set()
30
        self.finished: Set[str] = set()
31
32
    def _dfs_visit(self, uid, tree):
33
        """Do a depth first search visit of the specified item.
34
35
        :param uid: the UID of the item to visit
36
        :param tree: the document hierarchy tree
37
38
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
39
40
        """
41
        self.discovered.add(uid)
42
        item = tree.find_item(uid)
43
44
        for pid in item.links:
45
            # Detect cycles via a back edge
46
            if pid in self.discovered:
47
                msg = "detected a cycle with a back edge from {} to {}".format(pid, uid)
48
                yield common.DoorstopWarning(msg)
49
50
            # Recurse, if this a fresh item
51
            if pid not in self.discovered and pid not in self.finished:
52
                yield from self._dfs_visit(pid, tree)
53
54
        self.discovered.remove(uid)
55
        self.finished.add(uid)
56
57
    def __call__(self, item, document, tree):
58
        """Get cycles which include the specified item.
59
60
        :param item: the UID of the item to get the cycles for
61
        :param document: unused
62
        :param tree: the document hierarchy tree
63
64
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
65
66
        """
67
        if item not in self.discovered and item not in self.finished:
68
            yield from self._dfs_visit(item, tree)
69
70
71
def get(name):
72
    """Get a command function by name."""
73
    if name:
74
        log.debug("running command '{}'...".format(name))
75
        return globals()["run_" + name]
76
    else:
77
        log.debug("launching main command...")
78
        return run
79
80
81
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
82
    """Process arguments and run the `doorstop` subcommand.
83
84
    :param args: Namespace of CLI arguments
85
    :param cwd: current working directory
86
    :param error: function to call for CLI errors
87
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
88
89
    """
90
    with utilities.capture(catch=catch) as success:
91
        # get the tree
92
        tree = _get_tree(args, cwd, load=True)
93
94
        # validate it
95
        utilities.show("validating items...", flush=True)
96
        cycle_tracker = CycleTracker()
97
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
98
99
    if not success:
100
        return False
101
102
    if len(tree) > 1 and valid:
103
        utilities.show("\n" + tree.draw() + "\n")
104
105
    return valid
106
107
108
def run_create(args, cwd, _, catch=True):
109
    """Process arguments and run the `doorstop create` subcommand.
110
111
    :param args: Namespace of CLI arguments
112
    :param cwd: current working directory
113
    :param error: function to call for CLI errors
114
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
115
116
    """
117
    with utilities.capture(catch=catch) as success:
118
        # get the tree
119
        tree = _get_tree(args, cwd)
120
121
        # create a new document
122
        document = tree.create_document(
123
            args.path,
124
            args.prefix,
125
            parent=args.parent,
126
            digits=args.digits,
127
            sep=args.separator,
128
            itemformat=args.itemformat,
129
        )
130
131
    if not success:
132
        return False
133
134
    utilities.show(
135
        "created document: {} ({})".format(document.prefix, document.relpath)
136
    )
137
    return True
138
139
140
def run_delete(args, cwd, _, catch=True):
141
    """Process arguments and run the `doorstop delete` subcommand.
142
143
    :param args: Namespace of CLI arguments
144
    :param cwd: current working directory
145
    :param error: function to call for CLI errors
146
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
147
148
    """
149
    with utilities.capture(catch=catch) as success:
150
        # get the document
151
        tree = _get_tree(args, cwd)
152
        document = tree.find_document(args.prefix)
153
154
        # delete it
155
        prefix, relpath = document.prefix, document.relpath
156
        document.delete()
157
158
    if not success:
159
        return False
160
161
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
162
163
    return True
164
165
166
def run_add(args, cwd, _, catch=True):
167
    """Process arguments and run the `doorstop add` subcommand.
168
169
    :param args: Namespace of CLI arguments
170
    :param cwd: current working directory
171
    :param error: function to call for CLI errors
172
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
173
174
    """
175
    with utilities.capture(catch=catch) as success:
176
        # get the document
177
        request_next_number = _request_next_number(args)
178
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
179
        document = tree.find_document(args.prefix)
180
181
        # add items to it
182
        for _ in range(args.count):
183
            item = document.add_item(
184
                level=args.level,
185
                defaults=args.defaults,
186
                name=args.name,
187
                reorder=args.noreorder,
188
            )
189
            utilities.show("added item: {} ({})".format(item.uid, item.relpath))
190
191
        # Edit item if requested
192
        if args.edit:
193
            item.edit(tool=args.tool)
0 ignored issues
show
introduced by
The variable item does not seem to be defined in case the for loop on line 182 is not entered. Are you sure this can never be the case?
Loading history...
194
195
    if not success:
196
        return False
197
198
    return True
199
200
201
def run_remove(args, cwd, _, catch=True):
202
    """Process arguments and run the `doorstop remove` subcommand.
203
204
    :param args: Namespace of CLI arguments
205
    :param cwd: current working directory
206
    :param error: function to call for CLI errors
207
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
208
209
    """
210
    with utilities.capture(catch=catch) as success:
211
        # get the item
212
        tree = _get_tree(args, cwd)
213
        item = tree.find_item(args.uid)
214
215
        # delete it
216
        item.delete()
217
218
    if not success:
219
        return False
220
221
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
222
223
    return True
224
225
226
def run_edit(args, cwd, error, catch=True):
227
    """Process arguments and run the `doorstop edit` subcommand.
228
229
    :param args: Namespace of CLI arguments
230
    :param cwd: current working directory
231
    :param error: function to call for CLI errors
232
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
233
234
    """
235
    item = document = None
236
    ext = utilities.get_ext(args, error, ".yml", ".yml", whole_tree=False)
237
238
    with utilities.capture(catch=catch) as success:
239
        # get the item or document
240
        request_next_number = _request_next_number(args)
241
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
242
        if not args.document:
243
            try:
244
                item = tree.find_item(args.label)
245
            except common.DoorstopError as exc:
246
                if args.item:
247
                    raise exc from None  # pylint: disable=raising-bad-type
248
        if not item:
249
            document = tree.find_document(args.label)
250
251
        # edit it
252
        if item:
253
            item.edit(tool=args.tool, edit_all=args.all)
254
        else:
255
            _export_import(args, cwd, error, document, ext)
256
257
    if not success:
258
        return False
259
260
    if item:
261
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
262
263
    return True
264
265
266
def run_reorder(args, cwd, error, catch=True, _tree=None):
267
    """Process arguments and run the `doorstop reorder` subcommand.
268
269
    :param args: Namespace of CLI arguments
270
    :param cwd: current working directory
271
    :param error: function to call for CLI errors
272
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
273
274
    """
275
    reordered = False
276
277
    with utilities.capture(catch=catch) as success:
278
        # get the document
279
        tree = _tree or _get_tree(args, cwd)
280
        document = tree.find_document(args.prefix)
281
282
    if not success:
283
        return False
284
285
    with utilities.capture(catch=catch) as success:
286
        # automatically order
287
        if args.auto:
288
            msg = "reordering document {}...".format(document)
289
            utilities.show(msg, flush=True)
290
            document.reorder(manual=False)
291
            reordered = True
292
293
        # or, reorder from a previously updated index
294
        elif document.index:
295
            relpath = os.path.relpath(document.index, cwd)
296
            if utilities.ask("reorder from '{}'?".format(relpath)):
297
                msg = "reordering document {}...".format(document)
298
                utilities.show(msg, flush=True)
299
                document.reorder(automatic=not args.manual)
300
                reordered = True
301
            else:
302
                del document.index
303
304
        # or, create a new index to update
305
        else:
306
            document.index = True  # create index
307
            relpath = os.path.relpath(document.index, cwd)
308
            editor.edit(relpath, tool=args.tool)
309
            get("reorder")(args, cwd, error, catch=False, _tree=tree)
310
311
    if not success:
312
        msg = "after fixing the error: doorstop reorder {}".format(document)
313
        utilities.show(msg)
314
        return False
315
316
    if reordered:
317
        utilities.show("reordered document: {}".format(document))
318
319
    return True
320
321
322
def run_link(args, cwd, _, catch=True):
323
    """Process arguments and run the `doorstop link` subcommand.
324
325
    :param args: Namespace of CLI arguments
326
    :param cwd: current working directory
327
    :param error: function to call for CLI errors
328
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
329
330
    """
331
    with utilities.capture(catch=catch) as success:
332
        # get the tree
333
        tree = _get_tree(args, cwd)
334
335
        # link items
336
        child, parent = tree.link_items(args.child, args.parent)
337
338
    if not success:
339
        return False
340
341
    msg = "linked items: {} ({}) -> {} ({})".format(
342
        child.uid, child.relpath, parent.uid, parent.relpath
343
    )
344
    utilities.show(msg)
345
346
    return True
347
348
349
def run_unlink(args, cwd, _, catch=True):
350
    """Process arguments and run the `doorstop unlink` subcommand.
351
352
    :param args: Namespace of CLI arguments
353
    :param cwd: current working directory
354
    :param error: function to call for CLI errors
355
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
356
357
    """
358
    with utilities.capture(catch=catch) as success:
359
        # get the tree
360
        tree = _get_tree(args, cwd)
361
362
        # unlink items
363
        child, parent = tree.unlink_items(args.child, args.parent)
364
365
    if not success:
366
        return False
367
368
    msg = "unlinked items: {} ({}) -> {} ({})".format(
369
        child.uid, child.relpath, parent.uid, parent.relpath
370
    )
371
    utilities.show(msg)
372
373
    return True
374
375
376
def run_clear(args, cwd, error, catch=True):
377
    """Process arguments and run the `doorstop clear` subcommand.
378
379
    :param args: Namespace of CLI arguments
380
    :param cwd: current working directory
381
    :param error: function to call for CLI errors
382
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
383
384
    """
385
    with utilities.capture(catch=catch) as success:
386
        tree = _get_tree(args, cwd)
387
388
        if args.parents:
389
            # Check that the parent item UIDs exist
390
            for pid in args.parents:
391
                tree.find_item(pid)
392
393
            pids = " to " + ", ".join(args.parents)
394
        else:
395
            pids = ""
396
397
        for item in _iter_items(args, tree, error):
398
            msg = "clearing item {}'s suspect links{}...".format(item.uid, pids)
399
            utilities.show(msg)
400
            item.clear(parents=args.parents)
401
402
    if not success:
403
        return False
404
405
    return True
406
407
408
def run_review(args, cwd, error, catch=True):
409
    """Process arguments and run the `doorstop review` subcommand.
410
411
    :param args: Namespace of CLI arguments
412
    :param cwd: current working directory
413
    :param error: function to call for CLI errors
414
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
415
416
    """
417
    with utilities.capture(catch=catch) as success:
418
        tree = _get_tree(args, cwd)
419
420
        for item in _iter_items(args, tree, error):
421
            utilities.show("marking item {} as reviewed...".format(item.uid))
422
            item.review()
423
424
    if not success:
425
        return False
426
427
    return True
428
429
430
def run_import(args, cwd, error, catch=True, _tree=None):
431
    """Process arguments and run the `doorstop import` subcommand.
432
433
    :param args: Namespace of CLI arguments
434
    :param cwd: current working directory
435
    :param error: function to call for CLI errors
436
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
437
438
    """
439
    document = item = None
440
    attrs = utilities.literal_eval(args.attrs, error)
441
    mapping = utilities.literal_eval(args.map, error)
442
    if args.path:
443
        if not args.prefix:
444
            error("when [path] specified, [prefix] is also required")
445
        elif args.document:
446
            error("'--document' cannot be used with [path] [prefix]")
447
        elif args.item:
448
            error("'--item' cannot be used with [path] [prefix]")
449
        ext = utilities.get_ext(args, error, None, None)
450
    elif not (args.document or args.item):
451
        error("specify [path], '--document', or '--item' to import")
452
453
    with utilities.capture(catch=catch) as success:
454
        if args.path:
455
            # get the document
456
            request_next_number = _request_next_number(args)
457
            tree = _tree or _get_tree(
458
                args, cwd, request_next_number=request_next_number
459
            )
460
            document = tree.find_document(args.prefix)
461
462
            # import items into it
463
            msg = "importing '{}' into document {}...".format(args.path, document)
464
            utilities.show(msg, flush=True)
465
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
introduced by
The variable ext does not seem to be defined for all execution paths.
Loading history...
466
467
        elif args.document:
468
            prefix, path = args.document
469
            document = importer.create_document(prefix, path, parent=args.parent)
470
        elif args.item:
471
            prefix, uid = args.item
472
            request_next_number = _request_next_number(args)
473
            item = importer.add_item(
474
                prefix, uid, attrs=attrs, request_next_number=request_next_number
475
            )
476
    if not success:
477
        return False
478
479
    if document:
480
        utilities.show(
481
            "imported document: {} ({})".format(document.prefix, document.relpath)
482
        )
483
    else:
484
        assert item
485
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
486
487
    return True
488
489
490
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
491
    """Process arguments and run the `doorstop export` subcommand.
492
493
    :param args: Namespace of CLI arguments
494
    :param cwd: current working directory
495
    :param error: function to call for CLI errors
496
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
497
498
    :param auto: include placeholders for new items on import
499
500
    """
501
    whole_tree = args.prefix == "all"
502
    ext = utilities.get_ext(args, error, ".yml", ".csv", whole_tree=whole_tree)
503
504
    # Get the tree or document
505
    document = None
506
507
    with utilities.capture(catch=catch) as success:
508
        exporter.check(ext)
509
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
510
        if not whole_tree:
511
            document = tree.find_document(args.prefix)
512
513
    if not success:
514
        return False
515
516
    # Write to output file(s)
517
    if args.path:
518
        if whole_tree:
519
            msg = "exporting tree to '{}'...".format(args.path)
520
            utilities.show(msg, flush=True)
521
            path = exporter.export(tree, args.path, ext, auto=auto)
522
        else:
523
            msg = "exporting document {} to '{}'...".format(document, args.path)
524
            utilities.show(msg, flush=True)
525
            path = exporter.export(document, args.path, ext, auto=auto)
526
        if path:
527
            utilities.show("exported: {}".format(path))
528
529
    # Or, display to standard output
530
    else:
531
        if whole_tree:
532
            error("only single documents can be displayed")
533
        for line in exporter.export_lines(document, ext):
534
            utilities.show(line)
535
536
    return True
537
538
539
def run_publish(args, cwd, error, catch=True):
540
    """Process arguments and run the `doorstop publish` subcommand.
541
542
    :param args: Namespace of CLI arguments
543
    :param cwd: current working directory
544
    :param error: function to call for CLI errors
545
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
546
547
    """
548
    whole_tree = args.prefix == "all"
549
    ext = utilities.get_ext(args, error, ".txt", ".html", whole_tree)
550
551
    # Get the tree or document
552
    document = None
553
    with utilities.capture(catch=catch) as success:
554
        publisher.check(ext)
555
        tree = _get_tree(args, cwd, load=whole_tree)
556
        if not whole_tree:
557
            document = tree.find_document(args.prefix)
558
559
    if not success:
560
        return False
561
562
    # Set publishing arguments
563
    kwargs = {}
564
    if args.width:
565
        kwargs["width"] = args.width
566
567
    if args.index:
568
        kwargs["index"] = True
569
570
    # Write to output file(s)
571
    if args.path:
572
        path = os.path.abspath(os.path.join(cwd, args.path))
573
        if whole_tree:
574
            msg = "publishing tree to '{}'...".format(path)
575
            utilities.show(msg, flush=True)
576
            published_path = publisher.publish(
577
                tree, path, ext, template=args.template, **kwargs
578
            )
579
        else:
580
            msg = "publishing document {} to '{}'...".format(document, path)
581
            utilities.show(msg, flush=True)
582
            published_path = publisher.publish(
583
                document, path, ext, template=args.template, **kwargs
584
            )
585
        if published_path:
586
            utilities.show("published: {}".format(published_path))
587
588
    # Or, display to standard output
589
    else:
590
        if whole_tree:
591
            error("only single documents can be displayed")
592
        for line in publisher.publish_lines(document, ext, **kwargs):
593
            utilities.show(line)
594
595
    return True
596
597
598
def _request_next_number(args):
599
    """Get the server's "next number" method if a server exists."""
600
    if args.force:
601
        log.warning("creating items without the server...")
602
        return None
603
    else:
604
        server.check()
605
        return server.get_next_number
606
607
608
def _get_tree(args, cwd, request_next_number=None, load=False):
609
    """Build a tree and optionally load all documents.
610
611
    :param args: Namespace of CLI arguments
612
    :param cwd: current working directory
613
    :param request_next_number: server method to get a document's next number
614
    :param load: force the early loading of all documents
615
616
    :return: built :class:`~doorstop.core.tree.Tree`
617
618
    """
619
    utilities.show("building tree...", flush=True)
620
    tree = build(cwd=cwd, root=args.project, request_next_number=request_next_number)
621
622
    if load:
623
        utilities.show("loading documents...", flush=True)
624
        tree.load()
625
626
    return tree
627
628
629
def _iter_items(args, tree, error):
630
    """Iterate through items.
631
632
    :param args: Namespace of CLI arguments
633
    :param tree: the document hierarchy tree
634
    :param error: function to call for CLI errors
635
636
    Items are filtered to:
637
638
    - `args.label` == 'all': all items
639
    - `args.label` == document prefix: the document's items
640
    - `args.label` == item UID: a single item
641
642
    Documents and items are inferred unless flagged by:
643
644
    - `args.document`: `args.label` is a prefix
645
    - `args.item`: `args.label` is an UID
646
647
    """
648
    # Parse arguments
649
    if args.label == "all":
650
        if args.item:
651
            error("argument -i/--item: not allowed with 'all'")
652
        if args.document:
653
            error("argument -d/--document: not allowed with 'all'")
654
655
    # Build tree
656
    item = None
657
    document = None
658
659
    # Determine if tree, document, or item was requested
660
    if args.label != "all":
661
        if not args.item:
662
            try:
663
                document = tree.find_document(args.label)
664
            except common.DoorstopError as exc:
665
                if args.document:
666
                    raise exc from None  # pylint: disable=raising-bad-type
667
        if not document:
668
            item = tree.find_item(args.label)
669
670
    # Yield items from the requested object
671
    if item:
672
        yield item
673
    elif document:
674
        yield from document
675
    else:
676
        for document in tree:
677
            yield from document
678
679
680
def _export_import(args, cwd, error, document, ext):
681
    """Edit a document by calling export followed by import.
682
683
    :param args: Namespace of CLI arguments
684
    :param cwd: current working directory
685
    :param error: function to call for CLI errors
686
    :param document: :class:`~doorstop.core.document.Document` to edit
687
    :param ext: extension for export format
688
689
    """
690
    # Export the document to file
691
    args.prefix = document.prefix
692
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
693
    args.path = path
694
    get("export")(args, cwd, error, catch=False, auto=True, _tree=document.tree)
695
696
    # Open the exported file
697
    editor.edit(path, tool=args.tool)
698
699
    # Import the file to the same document
700
    if utilities.ask("import from '{}'?".format(path)):
701
        args.attrs = {}
702
        args.map = {}
703
        get("import")(args, cwd, error, catch=False, _tree=document.tree)
704
        common.delete(path)
705
    else:
706
        utilities.show("import canceled")
707
        if utilities.ask("delete '{}'?".format(path)):
708
            common.delete(path)
709
        else:
710
            msg = "to manually import: doorstop import {0}".format(path)
711
            utilities.show(msg)
712