Completed
Push — develop ( cf43d3...e33935 )
by Jace
15s queued 10s
created

doorstop.cli.commands.run_clear()   B

Complexity

Conditions 6

Size

Total Lines 30
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 6

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 30
ccs 20
cts 20
cp 1
rs 8.6666
c 0
b 0
f 0
cc 6
nop 4
crap 6
1
# SPDX-License-Identifier: LGPL-3.0-only
2
3 1
"""Command functions."""
4 1
5
import os
6 1
import time
7 1
8 1
from doorstop import common, server
9 1
from doorstop.cli import utilities
10 1
from doorstop.core import editor, exporter, importer, publisher
11
from doorstop.core.builder import build
12 1
13
log = common.logger(__name__)
14
15 1
16
class CycleTracker:
17 1
    """A cycle tracker to detect cyclic references between items.
18 1
19 1
    The cycle tracker uses a standard algorithm to detect cycles in a directed
20
    graph (not necessarily connected) using a depth first search with a bit of
21 1
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
22 1
    the items.  The edges are the links between items.
23
24
    """
25 1
26
    def __init__(self):
27
        """Initialize a cycle tracker."""
28
        self.discovered = set()
29
        self.finished = set()
30
31
    def _dfs_visit(self, uid, tree):
32
        """Do a depth first search visit of the specified item.
33
34 1
        :param uid: the UID of the item to visit
35
        :param tree: the document hierarchy tree
36
37 1
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
38
39
        """
40 1
        self.discovered.add(uid)
41 1
        item = tree.find_item(uid)
42
43 1
        for pid in item.links:
44 1
            # Detect cycles via a back edge
45
            if pid in self.discovered:
46 1
                msg = "detected a cycle with a back edge from {} to {}".format(pid, uid)
47 1
                yield common.DoorstopWarning(msg)
48
49 1
            # Recurse, if this a fresh item
50
            if pid not in self.discovered and pid not in self.finished:
51
                yield from self._dfs_visit(pid, tree)
52 1
53
        self.discovered.remove(uid)
54
        self.finished.add(uid)
55
56
    def __call__(self, item, document, tree):
57
        """Get cycles which include the specified item.
58
59
        :param item: the UID of the item to get the cycles for
60
        :param document: unused
61 1
        :param tree: the document hierarchy tree
62
63
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
64 1
65
        """
66
        if item not in self.discovered and item not in self.finished:
67 1
            yield from self._dfs_visit(item, tree)
68
69
70 1
def get(name):
71 1
    """Get a command function by name."""
72
    if name:
73 1
        log.debug("running command '{}'...".format(name))
74
        return globals()['run_' + name]
75 1
    else:
76
        log.debug("launching main command...")
77
        return run
78 1
79
80
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
81
    """Process arguments and run the `doorstop` subcommand.
82
83
    :param args: Namespace of CLI arguments
84
    :param cwd: current working directory
85
    :param error: function to call for CLI errors
86
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
87 1
88
    """
89
    with utilities.capture(catch=catch) as success:
90 1
91 1
        # get the tree
92
        tree = _get_tree(args, cwd, load=True)
93
94 1
        # validate it
95 1
        utilities.show("validating items...", flush=True)
96
        cycle_tracker = CycleTracker()
97 1
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
98 1
99
    if not success:
100 1
        return False
101
102 1
    if len(tree) > 1 and valid:
103
        utilities.show('\n' + tree.draw() + '\n')
104
105 1
    return valid
106
107
108
def run_create(args, cwd, _, catch=True):
109
    """Process arguments and run the `doorstop create` subcommand.
110
111
    :param args: Namespace of CLI arguments
112
    :param cwd: current working directory
113
    :param error: function to call for CLI errors
114 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
115
116
    """
117 1
    with utilities.capture(catch=catch) as success:
118 1
119 1
        # get the tree
120
        tree = _get_tree(args, cwd)
121
122 1
        # create a new document
123 1
        document = tree.create_document(
124 1
            args.path, args.prefix, parent=args.parent, digits=args.digits
125
        )
126
127 1
    if not success:
128 1
        return False
129
130 1
    utilities.show(
131
        "created document: {} ({})".format(document.prefix, document.relpath)
132
    )
133 1
    return True
134
135
136
def run_delete(args, cwd, _, catch=True):
137
    """Process arguments and run the `doorstop delete` subcommand.
138
139
    :param args: Namespace of CLI arguments
140
    :param cwd: current working directory
141
    :param error: function to call for CLI errors
142 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
143
144
    """
145 1
    with utilities.capture(catch=catch) as success:
146 1
147
        # get the document
148
        tree = _get_tree(args, cwd)
149 1
        document = tree.find_document(args.prefix)
150
151 1
        # delete it
152 1
        prefix, relpath = document.prefix, document.relpath
153
        document.delete()
154 1
155
    if not success:
156 1
        return False
157
158
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
159 1
160
    return True
161
162
163
def run_add(args, cwd, _, catch=True):
164
    """Process arguments and run the `doorstop add` subcommand.
165
166
    :param args: Namespace of CLI arguments
167
    :param cwd: current working directory
168 1
    :param error: function to call for CLI errors
169 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
170
171 1
    """
172
    with utilities.capture(catch=catch) as success:
173
174 1
        # get the document
175 1
        request_next_number = _request_next_number(args)
176 1
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
177 1
        document = tree.find_document(args.prefix)
178 1
179 1
        # add items to it
180 1
        for _ in range(args.count):
181
            item = document.add_item(level=args.level)
182 1
            utilities.show("added item: {} ({})".format(item.uid, item.relpath))
183 1
184
        # Edit item if requested
185
        if args.edit:
186 1
            item.edit(tool=args.tool)
0 ignored issues
show
introduced by
The variable item does not seem to be defined in case the for loop on line 180 is not entered. Are you sure this can never be the case?
Loading history...
187 1
188
    if not success:
189 1
        return False
190
191 1
    return True
192
193
194 1
def run_remove(args, cwd, _, catch=True):
195 1
    """Process arguments and run the `doorstop remove` subcommand.
196
197 1
    :param args: Namespace of CLI arguments
198
    :param cwd: current working directory
199
    :param error: function to call for CLI errors
200 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
201
202
    """
203
    with utilities.capture(catch=catch) as success:
204
205
        # get the item
206
        tree = _get_tree(args, cwd)
207
        item = tree.find_item(args.uid)
208
209 1
        # delete it
210
        item.delete()
211 1
212
    if not success:
213
        return False
214 1
215 1
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
216
217 1
    return True
218 1
219
220 1
def run_edit(args, cwd, error, catch=True):
221
    """Process arguments and run the `doorstop edit` subcommand.
222
223 1
    :param args: Namespace of CLI arguments
224 1
    :param cwd: current working directory
225 1
    :param error: function to call for CLI errors
226 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
227 1
228
    """
229
    item = document = None
230 1
    ext = utilities.get_ext(args, error, '.yml', '.yml', whole_tree=False)
231 1
232 1
    with utilities.capture(catch=catch) as success:
233 1
234 1
        # get the item or document
235 1
        request_next_number = _request_next_number(args)
236 1
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
237
        if not args.document:
238 1
            try:
239
                item = tree.find_item(args.label)
240
            except common.DoorstopError as exc:
241
                if args.item:
242 1
                    raise exc from None  # pylint: disable=raising-bad-type
243 1
        if not item:
244 1
            document = tree.find_document(args.label)
245 1
246
        # edit it
247 1
        if item:
248 1
            item.edit(tool=args.tool, edit_all=args.all)
249 1
        else:
250 1
            _export_import(args, cwd, error, document, ext)
251
252 1
    if not success:
253 1
        return False
254
255 1
    if item:
256
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
257
258 1
    return True
259
260
261
def run_reorder(args, cwd, error, catch=True, _tree=None):
262
    """Process arguments and run the `doorstop reorder` subcommand.
263
264
    :param args: Namespace of CLI arguments
265
    :param cwd: current working directory
266
    :param error: function to call for CLI errors
267 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
268
269
    """
270 1
    reordered = False
271
272
    with utilities.capture(catch=catch) as success:
273 1
274
        # get the document
275 1
        tree = _tree or _get_tree(args, cwd)
276 1
        document = tree.find_document(args.prefix)
277
278 1
    if not success:
279
        return False
280
281
    with utilities.capture(catch=catch) as success:
282 1
283
        # automatically order
284 1
        if args.auto:
285
            msg = "reordering document {}...".format(document)
286
            utilities.show(msg, flush=True)
287 1
            document.reorder(manual=False)
288
            reordered = True
289
290
        # or, reorder from a previously updated index
291
        elif document.index:
292
            relpath = os.path.relpath(document.index, cwd)
293
            if utilities.ask("reorder from '{}'?".format(relpath)):
294
                msg = "reordering document {}...".format(document)
295
                utilities.show(msg, flush=True)
296 1
                document.reorder(automatic=not args.manual)
297
                reordered = True
298
            else:
299 1
                del document.index
300
301
        # or, create a new index to update
302 1
        else:
303
            document.index = True  # create index
304 1
            relpath = os.path.relpath(document.index, cwd)
305 1
            editor.edit(relpath, tool=args.tool)
306
            get('reorder')(args, cwd, error, catch=False, _tree=tree)
307 1
308
    if not success:
309
        msg = "after fixing the error: doorstop reorder {}".format(document)
310
        utilities.show(msg)
311 1
        return False
312
313 1
    if reordered:
314
        utilities.show("reordered document: {}".format(document))
315
316 1
    return True
317
318
319
def run_link(args, cwd, _, catch=True):
320
    """Process arguments and run the `doorstop link` subcommand.
321
322
    :param args: Namespace of CLI arguments
323
    :param cwd: current working directory
324
    :param error: function to call for CLI errors
325 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
326
327 1
    """
328 1
    with utilities.capture(catch=catch) as success:
329 1
330 1
        # get the tree
331
        tree = _get_tree(args, cwd)
332 1
333 1
        # link items
334
        child, parent = tree.link_items(args.child, args.parent)
335 1
336
    if not success:
337
        return False
338 1
339
    msg = "linked items: {} ({}) -> {} ({})".format(
340
        child.uid, child.relpath, parent.uid, parent.relpath
341
    )
342
    utilities.show(msg)
343
344
    return True
345
346
347 1
def run_unlink(args, cwd, _, catch=True):
348
    """Process arguments and run the `doorstop unlink` subcommand.
349 1
350 1
    :param args: Namespace of CLI arguments
351 1
    :param cwd: current working directory
352
    :param error: function to call for CLI errors
353 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
354 1
355
    """
356 1
    with utilities.capture(catch=catch) as success:
357
358
        # get the tree
359 1
        tree = _get_tree(args, cwd)
360
361
        # unlink items
362
        child, parent = tree.unlink_items(args.child, args.parent)
363
364
    if not success:
365
        return False
366
367
    msg = "unlinked items: {} ({}) -> {} ({})".format(
368 1
        child.uid, child.relpath, parent.uid, parent.relpath
369 1
    )
370 1
    utilities.show(msg)
371 1
372 1
    return True
373 1
374 1
375 1
def run_clear(args, cwd, error, catch=True):
376 1
    """Process arguments and run the `doorstop clear` subcommand.
377 1
378 1
    :param args: Namespace of CLI arguments
379 1
    :param cwd: current working directory
380 1
    :param error: function to call for CLI errors
381
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
382 1
383
    """
384 1
    with utilities.capture(catch=catch) as success:
385
        tree = _get_tree(args, cwd)
386
387 1
        if args.parents:
388 1
            # Check that the parent item UIDs exist
389
            for pid in args.parents:
390 1
                tree.find_item(pid)
391
392
            pids = " to " + ", ".join(args.parents)
393 1
        else:
394
            pids = ""
395 1
396 1
        for item in _iter_items(args, tree, error):
397
            msg = "clearing item {}'s suspect links{}...".format(item.uid, pids)
398 1
            utilities.show(msg)
399 1
            item.clear(parents=args.parents)
400 1
401
    if not success:
402 1
        return False
403 1
404 1
    return True
405 1
406
407 1
def run_review(args, cwd, error, catch=True):
408 1
    """Process arguments and run the `doorstop review` subcommand.
409
410 1
    :param args: Namespace of CLI arguments
411 1
    :param cwd: current working directory
412
    :param error: function to call for CLI errors
413
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
414 1
415 1
    """
416
    with utilities.capture(catch=catch) as success:
417 1
        tree = _get_tree(args, cwd)
418
419
        for item in _iter_items(args, tree, error):
420 1
            utilities.show("marking item {} as reviewed...".format(item.uid))
421
            item.review()
422
423
    if not success:
424
        return False
425
426
    return True
427
428
429
def run_import(args, cwd, error, catch=True, _tree=None):
430
    """Process arguments and run the `doorstop import` subcommand.
431 1
432 1
    :param args: Namespace of CLI arguments
433
    :param cwd: current working directory
434
    :param error: function to call for CLI errors
435 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
436
437 1
    """
438 1
    document = item = None
439 1
    attrs = utilities.literal_eval(args.attrs, error)
440 1
    mapping = utilities.literal_eval(args.map, error)
441
    if args.path:
442 1
        if not args.prefix:
443 1
            error("when [path] specified, [prefix] is also required")
444
        elif args.document:
445
            error("'--document' cannot be used with [path] [prefix]")
446 1
        elif args.item:
447 1
            error("'--item' cannot be used with [path] [prefix]")
448 1
        ext = utilities.get_ext(args, error, None, None)
449 1
    elif not (args.document or args.item):
450 1
        error("specify [path], '--document', or '--item' to import")
451
452 1
    with utilities.capture(catch=catch) as success:
453
454 1
        if args.path:
455 1
456 1
            # get the document
457 1
            request_next_number = _request_next_number(args)
458
            tree = _tree or _get_tree(
459
                args, cwd, request_next_number=request_next_number
460
            )
461 1
            document = tree.find_document(args.prefix)
462 1
463 1
            # import items into it
464 1
            msg = "importing '{}' into document {}...".format(args.path, document)
465
            utilities.show(msg, flush=True)
466 1
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
introduced by
The variable ext does not seem to be defined for all execution paths.
Loading history...
467
468
        elif args.document:
469 1
            prefix, path = args.document
470
            document = importer.create_document(prefix, path, parent=args.parent)
471
        elif args.item:
472
            prefix, uid = args.item
473
            request_next_number = _request_next_number(args)
474
            item = importer.add_item(
475
                prefix, uid, attrs=attrs, request_next_number=request_next_number
476
            )
477
    if not success:
478 1
        return False
479 1
480
    if document:
481
        utilities.show(
482 1
            "imported document: {} ({})".format(document.prefix, document.relpath)
483
        )
484 1
    else:
485 1
        assert item
486 1
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
487 1
488
    return True
489 1
490 1
491
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
492
    """Process arguments and run the `doorstop export` subcommand.
493 1
494 1
    :param args: Namespace of CLI arguments
495 1
    :param cwd: current working directory
496
    :param error: function to call for CLI errors
497
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
498 1
499 1
    :param auto: include placeholders for new items on import
500 1
501 1
    """
502 1
    whole_tree = args.prefix == 'all'
503 1
    ext = utilities.get_ext(args, error, '.yml', '.csv', whole_tree=whole_tree)
504
505
    # Get the tree or document
506 1
    with utilities.capture(catch=catch) as success:
507
508 1
        exporter.check(ext)
509 1
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
510
        if not whole_tree:
511 1
            document = tree.find_document(args.prefix)
512 1
513
    if not success:
514
        return False
515
516 1
    # Write to output file(s)
517 1
    if args.path:
518 1
        if whole_tree:
519 1
            msg = "exporting tree to '{}'...".format(args.path)
520
            utilities.show(msg, flush=True)
521 1
            path = exporter.export(tree, args.path, ext, auto=auto)
522
        else:
523
            msg = "exporting document {} to '{}'...".format(document, args.path)
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 510 is False. Are you sure this can never be the case?
Loading history...
524 1
            utilities.show(msg, flush=True)
525
            path = exporter.export(document, args.path, ext, auto=auto)
526 1
        if path:
527 1
            utilities.show("exported: {}".format(path))
528 1
529
    # Or, display to standard output
530 1
    else:
531 1
        if whole_tree:
532
            error("only single documents can be displayed")
533
        for line in exporter.export_lines(document, ext):
534 1
            utilities.show(line)
535
536
    return True
537
538
539
def run_publish(args, cwd, error, catch=True):
540
    """Process arguments and run the `doorstop publish` subcommand.
541
542
    :param args: Namespace of CLI arguments
543
    :param cwd: current working directory
544
    :param error: function to call for CLI errors
545 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
546 1
547
    """
548
    whole_tree = args.prefix == 'all'
549 1
    ext = utilities.get_ext(args, error, '.txt', '.html', whole_tree)
550 1
551 1
    # Get the tree or document
552
    with utilities.capture(catch=catch) as success:
553 1
554
        publisher.check(ext)
555
        tree = _get_tree(args, cwd, load=whole_tree)
556 1
        if not whole_tree:
557
            document = tree.find_document(args.prefix)
558
559
    if not success:
560
        return False
561
562
    # Set publishing arguments
563
    kwargs = {}
564
    if args.width:
565
        kwargs['width'] = args.width
566
567
    # Write to output file(s)
568
    if args.path:
569
        path = os.path.abspath(os.path.join(cwd, args.path))
570
        if whole_tree:
571
            msg = "publishing tree to '{}'...".format(path)
572
            utilities.show(msg, flush=True)
573
            published_path = publisher.publish(
574
                tree, path, ext, template=args.template, **kwargs
575
            )
576 1
        else:
577 1
            msg = "publishing document {} to '{}'...".format(document, path)
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 556 is False. Are you sure this can never be the case?
Loading history...
578 1
            utilities.show(msg, flush=True)
579 1
            published_path = publisher.publish(
580 1
                document, path, ext, template=args.template, **kwargs
581
            )
582
        if published_path:
583 1
            utilities.show("published: {}".format(published_path))
584 1
585 1
    # Or, display to standard output
586
    else:
587
        if whole_tree:
588 1
            error("only single documents can be displayed")
589 1
        for line in publisher.publish_lines(document, ext, **kwargs):
590 1
            utilities.show(line)
591 1
592 1
    return True
593 1
594 1
595 1
def _request_next_number(args):
596 1
    """Get the server's "next number" method if a server exists."""
597
    if args.force:
598
        log.warning("creating items without the server...")
599 1
        return None
600 1
    else:
601 1
        server.check()
602 1
        return server.get_next_number
603 1
604
605 1
def _get_tree(args, cwd, request_next_number=None, load=False):
606 1
    """Build a tree and optionally load all documents.
607 1
608
    :param args: Namespace of CLI arguments
609
    :param cwd: current working directory
610 1
    :param request_next_number: server method to get a document's next number
611
    :param load: force the early loading of all documents
612
613
    :return: built :class:`~doorstop.core.tree.Tree`
614
615
    """
616
    utilities.show("building tree...", flush=True)
617
    tree = build(cwd=cwd, root=args.project, request_next_number=request_next_number)
618
619
    if load:
620
        utilities.show("loading documents...", flush=True)
621 1
        tree.load()
622 1
623 1
    return tree
624 1
625
626
def _iter_items(args, tree, error):
627
    """Iterate through items.
628 1
629
    :param args: Namespace of CLI arguments
630
    :param tree: the document hierarchy tree
631 1
    :param error: function to call for CLI errors
632 1
633 1
    Items are filtered to:
634 1
635 1
    - `args.label` == 'all': all items
636
    - `args.label` == document prefix: the document's items
637 1
    - `args.label` == item UID: a single item
638 1
639 1
    Documents and items are inferred unless flagged by:
640
641 1
    - `args.document`: `args.label` is a prefix
642 1
    - `args.item`: `args.label` is an UID
643
644
    """
645
    # Parse arguments
646
    if args.label == 'all':
647
        if args.item:
648
            error("argument -i/--item: not allowed with 'all'")
649
        if args.document:
650
            error("argument -d/--document: not allowed with 'all'")
651
652
    # Build tree
653
    item = None
654
    document = None
655
656
    # Determine if tree, document, or item was requested
657
    if args.label != 'all':
658
        if not args.item:
659
            try:
660
                document = tree.find_document(args.label)
661
            except common.DoorstopError as exc:
662
                if args.document:
663
                    raise exc from None  # pylint: disable=raising-bad-type
664
        if not document:
665
            item = tree.find_item(args.label)
666
667
    # Yield items from the requested object
668
    if item:
669
        yield item
670
    elif document:
671
        for item in document:
672
            yield item
673
    else:
674
        for document in tree:
675
            for item in document:
676
                yield item
677
678
679
def _export_import(args, cwd, error, document, ext):
680
    """Edit a document by calling export followed by import.
681
682
    :param args: Namespace of CLI arguments
683
    :param cwd: current working directory
684
    :param error: function to call for CLI errors
685
    :param document: :class:`~doorstop.core.document.Document` to edit
686
    :param ext: extension for export format
687
688
    """
689
    # Export the document to file
690
    args.prefix = document.prefix
691
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
692
    args.path = path
693
    get('export')(args, cwd, error, catch=False, auto=True, _tree=document.tree)
694
695
    # Open the exported file
696
    editor.edit(path, tool=args.tool)
697
698
    # Import the file to the same document
699
    if utilities.ask("import from '{}'?".format(path)):
700
        args.attrs = {}
701
        args.map = {}
702
        get('import')(args, cwd, error, catch=False, _tree=document.tree)
703
        common.delete(path)
704
    else:
705
        utilities.show("import canceled")
706
        if utilities.ask("delete '{}'?".format(path)):
707
            common.delete(path)
708
        else:
709
            msg = "to manually import: doorstop import {0}".format(path)
710
            utilities.show(msg)
711