Completed
Push — develop ( e53f27...975b1f )
by Jace
28s queued 11s
created

doorstop.cli.commands.CycleTracker.__call__()   A

Complexity

Conditions 3

Size

Total Lines 12
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 12
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 3
nop 4
crap 3
1
"""Command functions."""
2
3 1
import os
4 1
import time
5
6 1
from doorstop import common
7 1
from doorstop.cli import utilities
8 1
from doorstop.core.builder import build
9 1
from doorstop.core import editor, importer, exporter, publisher
10 1
from doorstop import server
11
12 1
log = common.logger(__name__)
13
14
15 1
class CycleTracker:
16
    """A cycle tracker to detect cyclic references between items.
17 1
18 1
    The cycle tracker uses a standard algorithm to detect cycles in a directed
19 1
    graph (not necessarily connected) using a depth first search with a bit of
20
    graph colouring.  The time complexity is O(|V| + |E|).  The vertices are
21 1
    the items.  The edges are the links between items.
22 1
23
    """
24
25 1
    def __init__(self):
26
        """Initialize a cycle tracker."""
27
        self.discovered = set()
28
        self.finished = set()
29
30
    def _dfs_visit(self, uid, tree):
31
        """Do a depth first search visit of the specified item.
32
33
        :param uid: the UID of the item to visit
34 1
        :param tree: the document hierarchy tree
35
36
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
37 1
38
        """
39
        self.discovered.add(uid)
40 1
        item = tree.find_item(uid)
41 1
42
        for pid in item.links:
43 1
            # Detect cycles via a back edge
44 1
            if pid in self.discovered:
45
                msg = "detected a cycle with a back edge from {} to {}" \
46 1
                      .format(pid, uid)
47 1
                yield common.DoorstopWarning(msg)
48
49 1
            # Recurse, if this a fresh item
50
            if pid not in self.discovered and pid not in self.finished:
51
                yield from self._dfs_visit(pid, tree)
52 1
53
        self.discovered.remove(uid)
54
        self.finished.add(uid)
55
56
    def __call__(self, item, document, tree):
57
        """Get cycles which include the specified item.
58
59
        :param item: the UID of the item to get the cycles for
60
        :param document: unused
61 1
        :param tree: the document hierarchy tree
62
63
        :return: generator of :class:`~doorstop.common.DoorstopWarning`
64 1
65
        """
66
        if item not in self.discovered and item not in self.finished:
67 1
            yield from self._dfs_visit(item, tree)
68
69
70 1
def get(name):
71 1
    """Get a command function by name."""
72
    if name:
73 1
        log.debug("running command '{}'...".format(name))
74
        return globals()['run_' + name]
75 1
    else:
76
        log.debug("launching main command...")
77
        return run
78 1
79
80
def run(args, cwd, error, catch=True):  # pylint: disable=W0613
81
    """Process arguments and run the `doorstop` subcommand.
82
83
    :param args: Namespace of CLI arguments
84
    :param cwd: current working directory
85
    :param error: function to call for CLI errors
86
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
87 1
88
    """
89
    with utilities.capture(catch=catch) as success:
90 1
91 1
        # get the tree
92
        tree = _get_tree(args, cwd, load=True)
93
94 1
        # validate it
95 1
        utilities.show("validating items...", flush=True)
96
        cycle_tracker = CycleTracker()
97 1
        valid = tree.validate(skip=args.skip, item_hook=cycle_tracker)
98 1
99
    if not success:
100 1
        return False
101
102 1
    if len(tree) > 1 and valid:
103
        utilities.show('\n' + tree.draw() + '\n')
104
105 1
    return valid
106
107
108
def run_create(args, cwd, _, catch=True):
109
    """Process arguments and run the `doorstop create` subcommand.
110
111
    :param args: Namespace of CLI arguments
112
    :param cwd: current working directory
113
    :param error: function to call for CLI errors
114 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
115
116
    """
117 1
    with utilities.capture(catch=catch) as success:
118 1
119 1
        # get the tree
120
        tree = _get_tree(args, cwd)
121
122 1
        # create a new document
123 1
        document = tree.create_document(args.path, args.prefix,
124 1
                                        parent=args.parent, digits=args.digits)
125
126
    if not success:
127 1
        return False
128 1
129
    utilities.show("created document: {} ({})".format(document.prefix,
130 1
                                                      document.relpath))
131
    return True
132
133 1
134
def run_delete(args, cwd, _, catch=True):
135
    """Process arguments and run the `doorstop delete` subcommand.
136
137
    :param args: Namespace of CLI arguments
138
    :param cwd: current working directory
139
    :param error: function to call for CLI errors
140
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
141
142 1
    """
143
    with utilities.capture(catch=catch) as success:
144
145 1
        # get the document
146 1
        tree = _get_tree(args, cwd)
147
        document = tree.find_document(args.prefix)
148
149 1
        # delete it
150
        prefix, relpath = document.prefix, document.relpath
151 1
        document.delete()
152 1
153
    if not success:
154 1
        return False
155
156 1
    utilities.show("deleted document: {} ({})".format(prefix, relpath))
157
158
    return True
159 1
160
161
def run_add(args, cwd, _, catch=True):
162
    """Process arguments and run the `doorstop add` subcommand.
163
164
    :param args: Namespace of CLI arguments
165
    :param cwd: current working directory
166
    :param error: function to call for CLI errors
167
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
168 1
169 1
    """
170
    with utilities.capture(catch=catch) as success:
171 1
172
        # get the document
173
        request_next_number = _request_next_number(args)
174 1
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
175 1
        document = tree.find_document(args.prefix)
176 1
177 1
        # add items to it
178 1
        for _ in range(args.count):
179 1
            item = document.add_item(level=args.level)
180 1
            utilities.show("added item: {} ({})".format(item.uid,
181
                                                        item.relpath))
182 1
183 1
        # Edit item if requested
184
        if args.edit:
185
            item.edit(tool=args.tool)
0 ignored issues
show
introduced by
The variable item does not seem to be defined in case the for loop on line 178 is not entered. Are you sure this can never be the case?
Loading history...
186 1
187 1
    if not success:
188
        return False
189 1
190
    return True
191 1
192
193
def run_remove(args, cwd, _, catch=True):
194 1
    """Process arguments and run the `doorstop remove` subcommand.
195 1
196
    :param args: Namespace of CLI arguments
197 1
    :param cwd: current working directory
198
    :param error: function to call for CLI errors
199
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
200 1
201
    """
202
    with utilities.capture(catch=catch) as success:
203
204
        # get the item
205
        tree = _get_tree(args, cwd)
206
        item = tree.find_item(args.uid)
207
208
        # delete it
209 1
        item.delete()
210
211 1
    if not success:
212
        return False
213
214 1
    utilities.show("removed item: {} ({})".format(item.uid, item.relpath))
215 1
216
    return True
217 1
218 1
219
def run_edit(args, cwd, error, catch=True):
220 1
    """Process arguments and run the `doorstop edit` subcommand.
221
222
    :param args: Namespace of CLI arguments
223 1
    :param cwd: current working directory
224 1
    :param error: function to call for CLI errors
225 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
226 1
227 1
    """
228
    item = document = None
229
    ext = utilities.get_ext(args, error, '.yml', '.yml', whole_tree=False)
230 1
231 1
    with utilities.capture(catch=catch) as success:
232 1
233 1
        # get the item or document
234 1
        request_next_number = _request_next_number(args)
235 1
        tree = _get_tree(args, cwd, request_next_number=request_next_number)
236 1
        if not args.document:
237
            try:
238 1
                item = tree.find_item(args.label)
239
            except common.DoorstopError as exc:
240
                if args.item:
241
                    raise exc from None  # pylint: disable=raising-bad-type
242 1
        if not item:
243 1
            document = tree.find_document(args.label)
244 1
245 1
        # edit it
246
        if item:
247 1
            item.edit(tool=args.tool, edit_all=args.all)
248 1
        else:
249 1
            _export_import(args, cwd, error, document, ext)
250 1
251
    if not success:
252 1
        return False
253 1
254
    if item:
255 1
        utilities.show("opened item: {} ({})".format(item.uid, item.relpath))
256
257
    return True
258 1
259
260
def run_reorder(args, cwd, error, catch=True, _tree=None):
261
    """Process arguments and run the `doorstop reorder` subcommand.
262
263
    :param args: Namespace of CLI arguments
264
    :param cwd: current working directory
265
    :param error: function to call for CLI errors
266
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
267 1
268
    """
269
    reordered = False
270 1
271
    with utilities.capture(catch=catch) as success:
272
273 1
        # get the document
274
        tree = _tree or _get_tree(args, cwd)
275 1
        document = tree.find_document(args.prefix)
276 1
277
    if not success:
278 1
        return False
279
280
    with utilities.capture(catch=catch) as success:
281
282 1
        # automatically order
283
        if args.auto:
284 1
            msg = "reordering document {}...".format(document)
285
            utilities.show(msg, flush=True)
286
            document.reorder(manual=False)
287 1
            reordered = True
288
289
        # or, reorder from a previously updated index
290
        elif document.index:
291
            relpath = os.path.relpath(document.index, cwd)
292
            if utilities.ask("reorder from '{}'?".format(relpath)):
293
                msg = "reordering document {}...".format(document)
294
                utilities.show(msg, flush=True)
295
                document.reorder(automatic=not args.manual)
296 1
                reordered = True
297
            else:
298
                del document.index
299 1
300
        # or, create a new index to update
301
        else:
302 1
            document.index = True  # create index
303
            relpath = os.path.relpath(document.index, cwd)
304 1
            editor.edit(relpath, tool=args.tool)
305 1
            get('reorder')(args, cwd, error, catch=False, _tree=tree)
306
307 1
    if not success:
308
        msg = "after fixing the error: doorstop reorder {}".format(document)
309
        utilities.show(msg)
310
        return False
311 1
312
    if reordered:
313 1
        utilities.show("reordered document: {}".format(document))
314
315
    return True
316 1
317
318
def run_link(args, cwd, _, catch=True):
319
    """Process arguments and run the `doorstop link` subcommand.
320
321
    :param args: Namespace of CLI arguments
322
    :param cwd: current working directory
323
    :param error: function to call for CLI errors
324
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
325 1
326
    """
327 1
    with utilities.capture(catch=catch) as success:
328 1
329 1
        # get the tree
330 1
        tree = _get_tree(args, cwd)
331
332 1
        # link items
333 1
        child, parent = tree.link_items(args.child, args.parent)
334
335 1
    if not success:
336
        return False
337
338 1
    msg = "linked items: {} ({}) -> {} ({})".format(child.uid,
339
                                                    child.relpath,
340
                                                    parent.uid,
341
                                                    parent.relpath)
342
    utilities.show(msg)
343
344
    return True
345
346
347 1
def run_unlink(args, cwd, _, catch=True):
348
    """Process arguments and run the `doorstop unlink` subcommand.
349 1
350 1
    :param args: Namespace of CLI arguments
351 1
    :param cwd: current working directory
352
    :param error: function to call for CLI errors
353 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
354 1
355
    """
356 1
    with utilities.capture(catch=catch) as success:
357
358
        # get the tree
359 1
        tree = _get_tree(args, cwd)
360
361
        # unlink items
362
        child, parent = tree.unlink_items(args.child, args.parent)
363
364
    if not success:
365
        return False
366
367
    msg = "unlinked items: {} ({}) -> {} ({})".format(child.uid,
368 1
                                                      child.relpath,
369 1
                                                      parent.uid,
370 1
                                                      parent.relpath)
371 1
    utilities.show(msg)
372 1
373 1
    return True
374 1
375 1
376 1
def run_clear(args, cwd, error, catch=True):
377 1
    """Process arguments and run the `doorstop clear` subcommand.
378 1
379 1
    :param args: Namespace of CLI arguments
380 1
    :param cwd: current working directory
381
    :param error: function to call for CLI errors
382 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
383
384 1
    """
385
    with utilities.capture(catch=catch) as success:
386
387 1
        for item in _iter_items(args, cwd, error):
388 1
            msg = "clearing item {}'s suspect links...".format(item.uid)
389
            utilities.show(msg)
390 1
            item.clear()
391
392
    if not success:
393 1
        return False
394
395 1
    return True
396 1
397
398 1
def run_review(args, cwd, error, catch=True):
399 1
    """Process arguments and run the `doorstop review` subcommand.
400 1
401
    :param args: Namespace of CLI arguments
402 1
    :param cwd: current working directory
403 1
    :param error: function to call for CLI errors
404 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
405 1
406
    """
407 1
    with utilities.capture(catch=catch) as success:
408 1
409
        for item in _iter_items(args, cwd, error):
410 1
            utilities.show("marking item {} as reviewed...".format(item.uid))
411 1
            item.review()
412
413
    if not success:
414 1
        return False
415 1
416
    return True
417 1
418
419
def run_import(args, cwd, error, catch=True, _tree=None):
420 1
    """Process arguments and run the `doorstop import` subcommand.
421
422
    :param args: Namespace of CLI arguments
423
    :param cwd: current working directory
424
    :param error: function to call for CLI errors
425
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
426
427
    """
428
    document = item = None
429
    attrs = utilities.literal_eval(args.attrs, error)
430
    mapping = utilities.literal_eval(args.map, error)
431 1
    if args.path:
432 1
        if not args.prefix:
433
            error("when [path] specified, [prefix] is also required")
434
        elif args.document:
435 1
            error("'--document' cannot be used with [path] [prefix]")
436
        elif args.item:
437 1
            error("'--item' cannot be used with [path] [prefix]")
438 1
        ext = utilities.get_ext(args, error, None, None)
439 1
    elif not (args.document or args.item):
440 1
        error("specify [path], '--document', or '--item' to import")
441
442 1
    with utilities.capture(catch=catch) as success:
443 1
444
        if args.path:
445
446 1
            # get the document
447 1
            request_next_number = _request_next_number(args)
448 1
            tree = _tree or _get_tree(args, cwd,
449 1
                                      request_next_number=request_next_number)
450 1
            document = tree.find_document(args.prefix)
451
452 1
            # import items into it
453
            msg = "importing '{}' into document {}...".format(args.path,
454 1
                                                              document)
455 1
            utilities.show(msg, flush=True)
456 1
            importer.import_file(args.path, document, ext, mapping=mapping)
0 ignored issues
show
introduced by
The variable ext does not seem to be defined for all execution paths.
Loading history...
457 1
458
        elif args.document:
459
            prefix, path = args.document
460
            document = importer.create_document(prefix, path,
461 1
                                                parent=args.parent)
462 1
        elif args.item:
463 1
            prefix, uid = args.item
464 1
            request_next_number = _request_next_number(args)
465
            item = importer.add_item(prefix, uid, attrs=attrs,
466 1
                                     request_next_number=request_next_number)
467
    if not success:
468
        return False
469 1
470
    if document:
471
        utilities.show("imported document: {} ({})".format(document.prefix,
472
                                                           document.relpath))
473
    else:
474
        assert item
475
        utilities.show("imported item: {} ({})".format(item.uid, item.relpath))
476
477
    return True
478 1
479 1
480
def run_export(args, cwd, error, catch=True, auto=False, _tree=None):
481
    """Process arguments and run the `doorstop export` subcommand.
482 1
483
    :param args: Namespace of CLI arguments
484 1
    :param cwd: current working directory
485 1
    :param error: function to call for CLI errors
486 1
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
487 1
488
    :param auto: include placeholders for new items on import
489 1
490 1
    """
491
    whole_tree = args.prefix == 'all'
492
    ext = utilities.get_ext(args, error, '.yml', '.csv', whole_tree=whole_tree)
493 1
494 1
    # Get the tree or document
495 1
    with utilities.capture(catch=catch) as success:
496
497
        exporter.check(ext)
498 1
        tree = _tree or _get_tree(args, cwd, load=whole_tree)
499 1
        if not whole_tree:
500 1
            document = tree.find_document(args.prefix)
501 1
502 1
    if not success:
503 1
        return False
504
505
    # Write to output file(s)
506 1
    if args.path:
507
        if whole_tree:
508 1
            msg = "exporting tree to '{}'...".format(args.path)
509 1
            utilities.show(msg, flush=True)
510
            path = exporter.export(tree, args.path, ext, auto=auto)
511 1
        else:
512 1
            msg = "exporting document {} to '{}'...".format(document,
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 499 is False. Are you sure this can never be the case?
Loading history...
513
                                                            args.path)
514
            utilities.show(msg, flush=True)
515
            path = exporter.export(document, args.path, ext, auto=auto)
516 1
        if path:
517 1
            utilities.show("exported: {}".format(path))
518 1
519 1
    # Or, display to standard output
520
    else:
521 1
        if whole_tree:
522
            error("only single documents can be displayed")
523
        for line in exporter.export_lines(document, ext):
524 1
            utilities.show(line)
525
526 1
    return True
527 1
528 1
529
def run_publish(args, cwd, error, catch=True):
530 1
    """Process arguments and run the `doorstop publish` subcommand.
531 1
532
    :param args: Namespace of CLI arguments
533
    :param cwd: current working directory
534 1
    :param error: function to call for CLI errors
535
    :param catch: catch and log :class:`~doorstop.common.DoorstopError`
536
537
    """
538
    whole_tree = args.prefix == 'all'
539
    ext = utilities.get_ext(args, error, '.txt', '.html', whole_tree)
540
541
    # Get the tree or document
542
    with utilities.capture(catch=catch) as success:
543
544
        publisher.check(ext)
545 1
        tree = _get_tree(args, cwd, load=whole_tree)
546 1
        if not whole_tree:
547
            document = tree.find_document(args.prefix)
548
549 1
    if not success:
550 1
        return False
551 1
552
    # Set publishing arguments
553 1
    kwargs = {}
554
    if args.width:
555
        kwargs['width'] = args.width
556 1
557
    # Write to output file(s)
558
    if args.path:
559
        path = os.path.abspath(os.path.join(cwd, args.path))
560
        if whole_tree:
561
            msg = "publishing tree to '{}'...".format(path)
562
            utilities.show(msg, flush=True)
563
            published_path = publisher.publish(tree, path, ext,
564
                                               template=args.template, **kwargs)
565
        else:
566
            msg = "publishing document {} to '{}'...".format(document,
0 ignored issues
show
introduced by
The variable document does not seem to be defined in case BooleanNotNode on line 546 is False. Are you sure this can never be the case?
Loading history...
567
                                                             path)
568
            utilities.show(msg, flush=True)
569
            published_path = publisher.publish(document, path, ext,
570
                                               template=args.template, **kwargs)
571
        if published_path:
572
            utilities.show("published: {}".format(published_path))
573
574
    # Or, display to standard output
575
    else:
576 1
        if whole_tree:
577 1
            error("only single documents can be displayed")
578 1
        for line in publisher.publish_lines(document, ext, **kwargs):
579 1
            utilities.show(line)
580 1
581
    return True
582
583 1
584 1
def _request_next_number(args):
585 1
    """Get the server's "next number" method if a server exists."""
586
    if args.force:
587
        log.warning("creating items without the server...")
588 1
        return None
589 1
    else:
590 1
        server.check()
591 1
        return server.get_next_number
592 1
593 1
594 1
def _get_tree(args, cwd, request_next_number=None, load=False):
595 1
    """Build a tree and optionally load all documents.
596 1
597
    :param args: Namespace of CLI arguments
598
    :param cwd: current working directory
599 1
    :param request_next_number: server method to get a document's next number
600 1
    :param load: force the early loading of all documents
601 1
602 1
    :return: built :class:`~doorstop.core.tree.Tree`
603 1
604
    """
605 1
    utilities.show("building tree...", flush=True)
606 1
    tree = build(cwd=cwd, root=args.project,
607 1
                 request_next_number=request_next_number)
608
609
    if load:
610 1
        utilities.show("loading documents...", flush=True)
611
        tree.load()
612
613
    return tree
614
615
616
def _iter_items(args, cwd, error):
617
    """Build a tree and iterate through items.
618
619
    :param args: Namespace of CLI arguments
620
    :param cwd: current working directory
621 1
    :param error: function to call for CLI errors
622 1
623 1
    Items are filtered to:
624 1
625
    - `args.label` == 'all': all items
626
    - `args.label` == document prefix: the document's items
627
    - `args.label` == item UID: a single item
628 1
629
    Documents and items are inferred unless flagged by:
630
631 1
    - `args.document`: `args.label` is a prefix
632 1
    - `args.item`: `args.label` is an UID
633 1
634 1
    """
635 1
    # Parse arguments
636
    if args.label == 'all':
637 1
        if args.item:
638 1
            error("argument -i/--item: not allowed with 'all'")
639 1
        if args.document:
640
            error("argument -d/--document: not allowed with 'all'")
641 1
642 1
    # Build tree
643
    item = None
644
    document = None
645
    tree = tree = _get_tree(args, cwd)
646
647
    # Determine if tree, document, or item was requested
648
    if args.label != 'all':
649
        if not args.item:
650
            try:
651
                document = tree.find_document(args.label)
652
            except common.DoorstopError as exc:
653
                if args.document:
654
                    raise exc from None  # pylint: disable=raising-bad-type
655
        if not document:
656
            item = tree.find_item(args.label)
657
658
    # Yield items from the requested object
659
    if item:
660
        yield item
661
    elif document:
662
        for item in document:
663
            yield item
664
    else:
665
        for document in tree:
666
            for item in document:
667
                yield item
668
669
670
def _export_import(args, cwd, error, document, ext):
671
    """Edit a document by calling export followed by import.
672
673
    :param args: Namespace of CLI arguments
674
    :param cwd: current working directory
675
    :param error: function to call for CLI errors
676
    :param document: :class:`~doorstop.core.document.Document` to edit
677
    :param ext: extension for export format
678
679
    """
680
    # Export the document to file
681
    args.prefix = document.prefix
682
    path = "{}-{}{}".format(args.prefix, int(time.time()), ext)
683
    args.path = path
684
    get('export')(args, cwd, error, catch=False, auto=True,
685
                  _tree=document.tree)
686
687
    # Open the exported file
688
    editor.edit(path, tool=args.tool)
689
690
    # Import the file to the same document
691
    if utilities.ask("import from '{}'?".format(path)):
692
        args.attrs = {}
693
        args.map = {}
694
        get('import')(args, cwd, error, catch=False, _tree=document.tree)
695
        common.delete(path)
696
    else:
697
        utilities.show("import canceled")
698
        if utilities.ask("delete '{}'?".format(path)):
699
            common.delete(path)
700
        else:
701
            msg = "to manually import: doorstop import {0}".format(path)
702
            utilities.show(msg)
703