Completed
Push — develop ( c90912...5f624c )
by Jace
18s queued 11s
created

doorstop/cli/commands.py (4 issues)

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3 1
"""Command functions."""
4 1
5
import os
6 1
import time
7 1
8 1
from doorstop import common
9 1
from doorstop.cli import utilities
10 1
from doorstop.core.builder import build
11
from doorstop.core import editor, importer, exporter, publisher
12 1
from doorstop import server
13
14
log = common.logger(__name__)
15 1
16
17 1
class CycleTracker:
18 1
    """A cycle tracker to detect cyclic references between items.
19 1
20
    The cycle tracker uses a standard algorithm to detect cycles in a directed
21 1
    graph (not necessarily connected) using a depth first search with a bit of
22 1
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
23
    the items.  The edges are the links between items.
24
25 1
    """
26
27
    def __init__(self):
28
        """Initialize a cycle tracker."""
29
        self.discovered = set()
30
        self.finished = set()
31
32
    def _dfs_visit(self, uid, tree):
33
        """Do a depth first search visit of the specified item.
34 1
35
        :param uid: the UID of the item to visit
36
        :param tree: the document hierarchy tree
37 1
38
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
39
40 1
        """
41 1
        self.discovered.add(uid)
42
        item = tree.find_item(uid)
43 1
44 1
        for pid in item.links:
45
            # Detect cycles via a back edge
46 1
            if pid in self.discovered:
47 1
                msg = "detected a cycle with a back edge from {} to {}" \
48
                      .format(pid, uid)
49 1
                yield common.DoorstopWarning(msg)
50
51
            # Recurse, if this a fresh item
52 1
            if pid not in self.discovered and pid not in self.finished:
53
                yield from self._dfs_visit(pid, tree)
54
55
        self.discovered.remove(uid)
56
        self.finished.add(uid)
57
58
    def __call__(self, item, document, tree):
59
        """Get cycles which include the specified item.
60
61 1
        :param item: the UID of the item to get the cycles for
62
        :param document: unused
63
        :param tree: the document hierarchy tree
64 1
65
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
66
67 1
        """
68
        if item not in self.discovered and item not in self.finished:
69
            yield from self._dfs_visit(item, tree)
70 1
71 1
72
def get(name):
73 1
    """Get a command function by name."""
74
    if name:
75 1
        log.debug("running command '{}'...".format(name))
76
        return globals()['run_' + name]
77
    else:
78 1
        log.debug("launching main command...")
79
        return run
80
81
82
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
83
    """Process arguments and run the `doorstop` subcommand.
84
85
    :param args: Namespace of CLI arguments
86
    :param cwd: current working directory
87 1
    :param error: function to call for CLI errors
88
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
89
90 1
    """
91 1
    with utilities.capture(catch=catch) as success:
92
93
        # get the tree
94 1
        tree = _get_tree(args, cwd, load=True)
95 1
96
        # validate it
97 1
        utilities.show("validating items...", flush=True)
98 1
        cycle_tracker = CycleTracker()
99
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
100 1
101
    if not success:
102 1
        return False
103
104
    if len(tree) > 1 and valid:
105 1
        utilities.show('\n' + tree.draw() + '\n')
106
107
    return valid
108
109
110
def run_create(args, cwd, _, catch=True):
111
    """Process arguments and run the `doorstop create` subcommand.
112
113
    :param args: Namespace of CLI arguments
114 1
    :param cwd: current working directory
115
    :param error: function to call for CLI errors
116
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
117 1
118 1
    """
119 1
    with utilities.capture(catch=catch) as success:
120
121
        # get the tree
122 1
        tree = _get_tree(args, cwd)
123 1
124 1
        # create a new document
125
        document = tree.create_document(args.path, args.prefix,
126
                                        parent=args.parent, digits=args.digits)
127 1
128 1
    if not success:
129
        return False
130 1
131
    utilities.show("created document: {} ({})".format(document.prefix,
132
                                                      document.relpath))
133 1
    return True
134
135
136
def run_delete(args, cwd, _, catch=True):
137
    """Process arguments and run the `doorstop delete` subcommand.
138
139
    :param args: Namespace of CLI arguments
140
    :param cwd: current working directory
141
    :param error: function to call for CLI errors
142 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
143
144
    """
145 1
    with utilities.capture(catch=catch) as success:
146 1
147
        # get the document
148
        tree = _get_tree(args, cwd)
149 1
        document = tree.find_document(args.prefix)
150
151 1
        # delete it
152 1
        prefix, relpath = document.prefix, document.relpath
153
        document.delete()
154 1
155
    if not success:
156 1
        return False
157
158
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
159 1
160
    return True
161
162
163
def run_add(args, cwd, _, catch=True):
164
    """Process arguments and run the `doorstop add` subcommand.
165
166
    :param args: Namespace of CLI arguments
167
    :param cwd: current working directory
168 1
    :param error: function to call for CLI errors
169 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
170
171 1
    """
172
    with utilities.capture(catch=catch) as success:
173
174 1
        # get the document
175 1
        request_next_number = _request_next_number(args)
176 1
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
177 1
        document = tree.find_document(args.prefix)
178 1
179 1
        # add items to it
180 1
        for _ in range(args.count):
181
            item = document.add_item(level=args.level)
182 1
            utilities.show("added item: {} ({})".format(item.uid,
183 1
                                                        item.relpath))
184
185
        # Edit item if requested
186 1
        if args.edit:
187 1
            item.edit(tool=args.tool)
0 ignored issues
show
The variable item does not seem to be defined in case the for loop on line 180 is not entered. Are you sure this can never be the case?
Loading history...
188
189 1
    if not success:
190
        return False
191 1
192
    return True
193
194 1
195 1
def run_remove(args, cwd, _, catch=True):
196
    """Process arguments and run the `doorstop remove` subcommand.
197 1
198
    :param args: Namespace of CLI arguments
199
    :param cwd: current working directory
200 1
    :param error: function to call for CLI errors
201
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
202
203
    """
204
    with utilities.capture(catch=catch) as success:
205
206
        # get the item
207
        tree = _get_tree(args, cwd)
208
        item = tree.find_item(args.uid)
209 1
210
        # delete it
211 1
        item.delete()
212
213
    if not success:
214 1
        return False
215 1
216
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
217 1
218 1
    return True
219
220 1
221
def run_edit(args, cwd, error, catch=True):
222
    """Process arguments and run the `doorstop edit` subcommand.
223 1
224 1
    :param args: Namespace of CLI arguments
225 1
    :param cwd: current working directory
226 1
    :param error: function to call for CLI errors
227 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
228
229
    """
230 1
    item = document = None
231 1
    ext = utilities.get_ext(args, error, '.yml', '.yml', whole_tree=False)
232 1
233 1
    with utilities.capture(catch=catch) as success:
234 1
235 1
        # get the item or document
236 1
        request_next_number = _request_next_number(args)
237
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
238 1
        if not args.document:
239
            try:
240
                item = tree.find_item(args.label)
241
            except common.DoorstopError as exc:
242 1
                if args.item:
243 1
                    raise exc from None  # pylint: disable=raising-bad-type
244 1
        if not item:
245 1
            document = tree.find_document(args.label)
246
247 1
        # edit it
248 1
        if item:
249 1
            item.edit(tool=args.tool, edit_all=args.all)
250 1
        else:
251
            _export_import(args, cwd, error, document, ext)
252 1
253 1
    if not success:
254
        return False
255 1
256
    if item:
257
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
258 1
259
    return True
260
261
262
def run_reorder(args, cwd, error, catch=True, _tree=None):
263
    """Process arguments and run the `doorstop reorder` subcommand.
264
265
    :param args: Namespace of CLI arguments
266
    :param cwd: current working directory
267 1
    :param error: function to call for CLI errors
268
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
269
270 1
    """
271
    reordered = False
272
273 1
    with utilities.capture(catch=catch) as success:
274
275 1
        # get the document
276 1
        tree = _tree or _get_tree(args, cwd)
277
        document = tree.find_document(args.prefix)
278 1
279
    if not success:
280
        return False
281
282 1
    with utilities.capture(catch=catch) as success:
283
284 1
        # automatically order
285
        if args.auto:
286
            msg = "reordering document {}...".format(document)
287 1
            utilities.show(msg, flush=True)
288
            document.reorder(manual=False)
289
            reordered = True
290
291
        # or, reorder from a previously updated index
292
        elif document.index:
293
            relpath = os.path.relpath(document.index, cwd)
294
            if utilities.ask("reorder from '{}'?".format(relpath)):
295
                msg = "reordering document {}...".format(document)
296 1
                utilities.show(msg, flush=True)
297
                document.reorder(automatic=not args.manual)
298
                reordered = True
299 1
            else:
300
                del document.index
301
302 1
        # or, create a new index to update
303
        else:
304 1
            document.index = True  # create index
305 1
            relpath = os.path.relpath(document.index, cwd)
306
            editor.edit(relpath, tool=args.tool)
307 1
            get('reorder')(args, cwd, error, catch=False, _tree=tree)
308
309
    if not success:
310
        msg = "after fixing the error: doorstop reorder {}".format(document)
311 1
        utilities.show(msg)
312
        return False
313 1
314
    if reordered:
315
        utilities.show("reordered document: {}".format(document))
316 1
317
    return True
318
319
320
def run_link(args, cwd, _, catch=True):
321
    """Process arguments and run the `doorstop link` subcommand.
322
323
    :param args: Namespace of CLI arguments
324
    :param cwd: current working directory
325 1
    :param error: function to call for CLI errors
326
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
327 1
328 1
    """
329 1
    with utilities.capture(catch=catch) as success:
330 1
331
        # get the tree
332 1
        tree = _get_tree(args, cwd)
333 1
334
        # link items
335 1
        child, parent = tree.link_items(args.child, args.parent)
336
337
    if not success:
338 1
        return False
339
340
    msg = "linked items: {} ({}) -> {} ({})".format(child.uid,
341
                                                    child.relpath,
342
                                                    parent.uid,
343
                                                    parent.relpath)
344
    utilities.show(msg)
345
346
    return True
347 1
348
349 1
def run_unlink(args, cwd, _, catch=True):
350 1
    """Process arguments and run the `doorstop unlink` subcommand.
351 1
352
    :param args: Namespace of CLI arguments
353 1
    :param cwd: current working directory
354 1
    :param error: function to call for CLI errors
355
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
356 1
357
    """
358
    with utilities.capture(catch=catch) as success:
359 1
360
        # get the tree
361
        tree = _get_tree(args, cwd)
362
363
        # unlink items
364
        child, parent = tree.unlink_items(args.child, args.parent)
365
366
    if not success:
367
        return False
368 1
369 1
    msg = "unlinked items: {} ({}) -> {} ({})".format(child.uid,
370 1
                                                      child.relpath,
371 1
                                                      parent.uid,
372 1
                                                      parent.relpath)
373 1
    utilities.show(msg)
374 1
375 1
    return True
376 1
377 1
378 1
def run_clear(args, cwd, error, catch=True):
379 1
    """Process arguments and run the `doorstop clear` subcommand.
380 1
381
    :param args: Namespace of CLI arguments
382 1
    :param cwd: current working directory
383
    :param error: function to call for CLI errors
384 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
385
386
    """
387 1
    with utilities.capture(catch=catch) as success:
388 1
389
        for item in _iter_items(args, cwd, error):
390 1
            msg = "clearing item {}'s suspect links...".format(item.uid)
391
            utilities.show(msg)
392
            item.clear()
393 1
394
    if not success:
395 1
        return False
396 1
397
    return True
398 1
399 1
400 1
def run_review(args, cwd, error, catch=True):
401
    """Process arguments and run the `doorstop review` subcommand.
402 1
403 1
    :param args: Namespace of CLI arguments
404 1
    :param cwd: current working directory
405 1
    :param error: function to call for CLI errors
406
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
407 1
408 1
    """
409
    with utilities.capture(catch=catch) as success:
410 1
411 1
        for item in _iter_items(args, cwd, error):
412
            utilities.show("marking item {} as reviewed...".format(item.uid))
413
            item.review()
414 1
415 1
    if not success:
416
        return False
417 1
418
    return True
419
420 1
421
def run_import(args, cwd, error, catch=True, _tree=None):
422
    """Process arguments and run the `doorstop import` subcommand.
423
424
    :param args: Namespace of CLI arguments
425
    :param cwd: current working directory
426
    :param error: function to call for CLI errors
427
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
428
429
    """
430
    document = item = None
431 1
    attrs = utilities.literal_eval(args.attrs, error)
432 1
    mapping = utilities.literal_eval(args.map, error)
433
    if args.path:
434
        if not args.prefix:
435 1
            error("when [path] specified, [prefix] is also required")
436
        elif args.document:
437 1
            error("'--document' cannot be used with [path] [prefix]")
438 1
        elif args.item:
439 1
            error("'--item' cannot be used with [path] [prefix]")
440 1
        ext = utilities.get_ext(args, error, None, None)
441
    elif not (args.document or args.item):
442 1
        error("specify [path], '--document', or '--item' to import")
443 1
444
    with utilities.capture(catch=catch) as success:
445
446 1
        if args.path:
447 1
448 1
            # get the document
449 1
            request_next_number = _request_next_number(args)
450 1
            tree = _tree or _get_tree(args, cwd,
451
                                      request_next_number=request_next_number)
452 1
            document = tree.find_document(args.prefix)
453
454 1
            # import items into it
455 1
            msg = "importing '{}' into document {}...".format(args.path,
456 1
                                                              document)
457 1
            utilities.show(msg, flush=True)
458
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
The variable ext does not seem to be defined for all execution paths.
Loading history...
459
460
        elif args.document:
461 1
            prefix, path = args.document
462 1
            document = importer.create_document(prefix, path,
463 1
                                                parent=args.parent)
464 1
        elif args.item:
465
            prefix, uid = args.item
466 1
            request_next_number = _request_next_number(args)
467
            item = importer.add_item(prefix, uid, attrs=attrs,
468
                                     request_next_number=request_next_number)
469 1
    if not success:
470
        return False
471
472
    if document:
473
        utilities.show("imported document: {} ({})".format(document.prefix,
474
                                                           document.relpath))
475
    else:
476
        assert item
477
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
478 1
479 1
    return True
480
481
482 1
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
483
    """Process arguments and run the `doorstop export` subcommand.
484 1
485 1
    :param args: Namespace of CLI arguments
486 1
    :param cwd: current working directory
487 1
    :param error: function to call for CLI errors
488
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
489 1
490 1
    :param auto: include placeholders for new items on import
491
492
    """
493 1
    whole_tree = args.prefix == 'all'
494 1
    ext = utilities.get_ext(args, error, '.yml', '.csv', whole_tree=whole_tree)
495 1
496
    # Get the tree or document
497
    with utilities.capture(catch=catch) as success:
498 1
499 1
        exporter.check(ext)
500 1
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
501 1
        if not whole_tree:
502 1
            document = tree.find_document(args.prefix)
503 1
504
    if not success:
505
        return False
506 1
507
    # Write to output file(s)
508 1
    if args.path:
509 1
        if whole_tree:
510
            msg = "exporting tree to '{}'...".format(args.path)
511 1
            utilities.show(msg, flush=True)
512 1
            path = exporter.export(tree, args.path, ext, auto=auto)
513
        else:
514
            msg = "exporting document {} to '{}'...".format(document,
0 ignored issues
show
The variable document does not seem to be defined in case BooleanNotNode on line 501 is False. Are you sure this can never be the case?
Loading history...
515
                                                            args.path)
516 1
            utilities.show(msg, flush=True)
517 1
            path = exporter.export(document, args.path, ext, auto=auto)
518 1
        if path:
519 1
            utilities.show("exported: {}".format(path))
520
521 1
    # Or, display to standard output
522
    else:
523
        if whole_tree:
524 1
            error("only single documents can be displayed")
525
        for line in exporter.export_lines(document, ext):
526 1
            utilities.show(line)
527 1
528 1
    return True
529
530 1
531 1
def run_publish(args, cwd, error, catch=True):
532
    """Process arguments and run the `doorstop publish` subcommand.
533
534 1
    :param args: Namespace of CLI arguments
535
    :param cwd: current working directory
536
    :param error: function to call for CLI errors
537
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
538
539
    """
540
    whole_tree = args.prefix == 'all'
541
    ext = utilities.get_ext(args, error, '.txt', '.html', whole_tree)
542
543
    # Get the tree or document
544
    with utilities.capture(catch=catch) as success:
545 1
546 1
        publisher.check(ext)
547
        tree = _get_tree(args, cwd, load=whole_tree)
548
        if not whole_tree:
549 1
            document = tree.find_document(args.prefix)
550 1
551 1
    if not success:
552
        return False
553 1
554
    # Set publishing arguments
555
    kwargs = {}
556 1
    if args.width:
557
        kwargs['width'] = args.width
558
559
    # Write to output file(s)
560
    if args.path:
561
        path = os.path.abspath(os.path.join(cwd, args.path))
562
        if whole_tree:
563
            msg = "publishing tree to '{}'...".format(path)
564
            utilities.show(msg, flush=True)
565
            published_path = publisher.publish(tree, path, ext,
566
                                               template=args.template, **kwargs)
567
        else:
568
            msg = "publishing document {} to '{}'...".format(document,
0 ignored issues
show
The variable document does not seem to be defined in case BooleanNotNode on line 548 is False. Are you sure this can never be the case?
Loading history...
569
                                                             path)
570
            utilities.show(msg, flush=True)
571
            published_path = publisher.publish(document, path, ext,
572
                                               template=args.template, **kwargs)
573
        if published_path:
574
            utilities.show("published: {}".format(published_path))
575
576 1
    # Or, display to standard output
577 1
    else:
578 1
        if whole_tree:
579 1
            error("only single documents can be displayed")
580 1
        for line in publisher.publish_lines(document, ext, **kwargs):
581
            utilities.show(line)
582
583 1
    return True
584 1
585 1
586
def _request_next_number(args):
587
    """Get the server's "next number" method if a server exists."""
588 1
    if args.force:
589 1
        log.warning("creating items without the server...")
590 1
        return None
591 1
    else:
592 1
        server.check()
593 1
        return server.get_next_number
594 1
595 1
596 1
def _get_tree(args, cwd, request_next_number=None, load=False):
597
    """Build a tree and optionally load all documents.
598
599 1
    :param args: Namespace of CLI arguments
600 1
    :param cwd: current working directory
601 1
    :param request_next_number: server method to get a document's next number
602 1
    :param load: force the early loading of all documents
603 1
604
    :return: built :class:`~doorstop.core.tree.Tree`
605 1
606 1
    """
607 1
    utilities.show("building tree...", flush=True)
608
    tree = build(cwd=cwd, root=args.project,
609
                 request_next_number=request_next_number)
610 1
611
    if load:
612
        utilities.show("loading documents...", flush=True)
613
        tree.load()
614
615
    return tree
616
617
618
def _iter_items(args, cwd, error):
619
    """Build a tree and iterate through items.
620
621 1
    :param args: Namespace of CLI arguments
622 1
    :param cwd: current working directory
623 1
    :param error: function to call for CLI errors
624 1
625
    Items are filtered to:
626
627
    - `args.label` == 'all': all items
628 1
    - `args.label` == document prefix: the document's items
629
    - `args.label` == item UID: a single item
630
631 1
    Documents and items are inferred unless flagged by:
632 1
633 1
    - `args.document`: `args.label` is a prefix
634 1
    - `args.item`: `args.label` is an UID
635 1
636
    """
637 1
    # Parse arguments
638 1
    if args.label == 'all':
639 1
        if args.item:
640
            error("argument -i/--item: not allowed with 'all'")
641 1
        if args.document:
642 1
            error("argument -d/--document: not allowed with 'all'")
643
644
    # Build tree
645
    item = None
646
    document = None
647
    tree = tree = _get_tree(args, cwd)
648
649
    # Determine if tree, document, or item was requested
650
    if args.label != 'all':
651
        if not args.item:
652
            try:
653
                document = tree.find_document(args.label)
654
            except common.DoorstopError as exc:
655
                if args.document:
656
                    raise exc from None  # pylint: disable=raising-bad-type
657
        if not document:
658
            item = tree.find_item(args.label)
659
660
    # Yield items from the requested object
661
    if item:
662
        yield item
663
    elif document:
664
        for item in document:
665
            yield item
666
    else:
667
        for document in tree:
668
            for item in document:
669
                yield item
670
671
672
def _export_import(args, cwd, error, document, ext):
673
    """Edit a document by calling export followed by import.
674
675
    :param args: Namespace of CLI arguments
676
    :param cwd: current working directory
677
    :param error: function to call for CLI errors
678
    :param document: :class:`~doorstop.core.document.Document` to edit
679
    :param ext: extension for export format
680
681
    """
682
    # Export the document to file
683
    args.prefix = document.prefix
684
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
685
    args.path = path
686
    get('export')(args, cwd, error, catch=False, auto=True,
687
                  _tree=document.tree)
688
689
    # Open the exported file
690
    editor.edit(path, tool=args.tool)
691
692
    # Import the file to the same document
693
    if utilities.ask("import from '{}'?".format(path)):
694
        args.attrs = {}
695
        args.map = {}
696
        get('import')(args, cwd, error, catch=False, _tree=document.tree)
697
        common.delete(path)
698
    else:
699
        utilities.show("import canceled")
700
        if utilities.ask("delete '{}'?".format(path)):
701
            common.delete(path)
702
        else:
703
            msg = "to manually import: doorstop import {0}".format(path)
704
            utilities.show(msg)
705