Completed
Push — master ( 669a7b...91f4e1 )
by Vincent
01:13
created

_Execute._triage_pending_dependents()   B

Complexity

Conditions 6

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 6
rs 8
cc 6
1
# coding: utf8
2
3
# Copyright 2013-2017 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
import datetime
8
import multiprocessing
9
import os.path
10
import pickle
11
12
import graphviz
13
import matplotlib
14
import matplotlib.dates
15
import matplotlib.figure
16
import matplotlib.backends.backend_agg
17
import wurlitzer
18
19
20
def execute(action, jobs=1, keep_going=False, do_raise=True, hooks=None):
21
    """
22
    Recursively execute an :class:`.Action`'s dependencies then the action.
23
24
    :param Action action: the action to execute.
25
    :param jobs: number of actions to execute in parallel. Pass ``None`` to let ActionTree choose.
26
    :type jobs: int or None
27
    :param bool keep_going: if ``True``, then execution does not stop on first failure,
28
        but executes as many dependencies as possible.
29
    :param bool do_raise: if ``False``, then exceptions are not re-raised as :exc:`CompoundException`
30
        but only included in the :class:`.ExecutionReport`.
31
    :param Hooks hooks: its methods will be called when execution progresses.
32
33
    :raises CompoundException: when ``do_raise`` is ``True`` and dependencies raise exceptions.
34
35
    :rtype: ExecutionReport
36
    """
37
    _check_picklability(action)
38
    if jobs is None:
39
        jobs = multiprocessing.cpu_count()
40
    if hooks is None:
41
        hooks = Hooks()
42
    return _Execute(jobs, keep_going, do_raise, hooks).run(action)
43
44
45
class Action(object):
46
    """
47
    The main class of ActionTree.
48
    An action to be started after all its dependencies are finished.
49
    Pass it to :func:`.execute`.
50
51
    This is a base class for your custom actions.
52
    You must define a ``do_execute(self, dependency_statuses)`` method that performs the action.
53
    The ``dependency_statuses`` argument is a dictionary whose keys are ``self.dependencies`` and values are their
54
    :class:`.ActionStatus`.
55
    :ref:`outputs` describes how its return values, the exceptions it may raise and what it may print is handled.
56
57
    Actions, return values and exceptions raised must be picklable.
58
    """
59
60
    def __init__(self, label, weak_dependencies=False):
61
        """
62
        :param label: whatever you want to attach to the action.
63
            ``str(label)`` must succeed and return a string.
64
            Can be retrieved by :attr:`label`.
65
        :param bool weak_dependencies:
66
            it ``True``, then the action will execute even if some of its dependencies failed.
67
            Note that if you want this behavior for only a subset of the dependencies,
68
            you can group them in a :class:`.stock.NullAction`.
69
        """
70
        str(label)
71
        self.__label = label
72
        self.__weak_dependencies = weak_dependencies
73
        self.__dependencies = []
74
75
    @property
76
    def label(self):
77
        """
78
        The label passed to the constructor.
79
        """
80
        return self.__label
81
82
    @property
83
    def weak_dependencies(self):
84
        """
85
        ``True`` if the action will execute even if some of its dependencies failed.
86
87
        :rtype: bool
88
        """
89
        return self.__weak_dependencies
90
91
    def add_dependency(self, dependency):
92
        """
93
        Add a dependency to be executed before this action.
94
        Order of insertion of dependencies is not important.
95
96
        :param Action dependency:
97
98
        :raises DependencyCycleException: when adding the new dependency would create a cycle.
99
        """
100
        if self in dependency.get_possible_execution_order():
101
            raise DependencyCycleException()
102
        self.__dependencies.append(dependency)
103
104
    @property
105
    def dependencies(self):
106
        """
107
        The list of this action's direct dependencies.
108
        """
109
        return list(self.__dependencies)
110
111
    def get_possible_execution_order(self, seen_actions=None):
112
        """
113
        Return the list of all this action's dependencies (recursively),
114
        in an order that is suitable for linear execution.
115
        Note that this order is not unique.
116
        The order chosen is not specified.
117
        """
118
        if seen_actions is None:
119
            seen_actions = set()
120
        actions = []
121
        if self not in seen_actions:
122
            seen_actions.add(self)
123
            for dependency in self.__dependencies:
124
                actions += dependency.get_possible_execution_order(seen_actions)
125
            actions.append(self)
126
        return actions
127
128
129
class Hooks(object):
130
    """
131
    Base class to derive from when defining your hooks.
132
    :func:`.execute` will call its methods when execution progresses.
133
    """
134
    def action_pending(self, time, action):
135
        """
136
        Called when an action is considered for execution, i.e. at the beginning of :func:`.execute`.
137
138
        :param datetime.datetime time: the time at which the action was considered for execution.
139
        :param Action action: the action.
140
        """
141
142
    def action_ready(self, time, action):
143
        """
144
        Called when an action is ready to be executed, i.e. when all its dependencies have succeeded.
145
146
        :param datetime.datetime time: the time at which the action was ready.
147
        :param Action action: the action.
148
        """
149
150
    def action_canceled(self, time, action):
151
        """
152
        Called when an action's execution is canceled, i.e. when some of its dependencies has failed.
153
154
        :param datetime.datetime time: the time at which the action was canceled.
155
        :param Action action: the action.
156
        """
157
158
    def action_started(self, time, action):
159
        """
160
        Called when an action's execution starts.
161
162
        :param datetime.datetime time: the time at which the action was started.
163
        :param Action action: the action.
164
        """
165
166
    def action_printed(self, time, action, text):
167
        """
168
        Called when an action prints something.
169
170
        :param datetime.datetime time: the time at which the action printed the text.
171
        :param Action action: the action.
172
        :param str text: the text printed.
173
        """
174
175
    def action_successful(self, time, action, return_value):
176
        """
177
        Called when an action completes without error.
178
179
        :param datetime.datetime time: the time at which the action completed.
180
        :param Action action: the action.
181
        :param return_value: the value returned by the action.
182
        """
183
184
    def action_failed(self, time, action, exception):
185
        """
186
        Called when an action completes with an exception.
187
188
        :param datetime.datetime time: the time at which the action completed.
189
        :param Action action: the action.
190
        :param exception: the exception raised by the action
191
        """
192
193
194
class DependencyCycleException(Exception):
195
    """
196
    Exception thrown by :meth:`.Action.add_dependency` when adding the new dependency would create a cycle.
197
    """
198
199
    def __init__(self):
200
        super(DependencyCycleException, self).__init__("Dependency cycle")
201
202
203
class CompoundException(Exception):
204
    """
205
    Exception thrown by :func:`.execute` when dependencies raise exceptions.
206
    """
207
208
    def __init__(self, exceptions, execution_report):
209
        super(CompoundException, self).__init__(exceptions)
210
        self.__exceptions = exceptions
211
        self.__execution_report = execution_report
212
213
    @property
214
    def exceptions(self):
215
        """
216
        The list of exceptions raised.
217
        """
218
        return self.__exceptions
219
220
    @property
221
    def execution_report(self):
222
        """
223
        The :class:`.ExecutionReport` of the failed execution.
224
        """
225
        return self.__execution_report
226
227
228
class ExecutionReport(object):
229
    """
230
    ExecutionReport()
231
232
    Execution report, returned by :func:`.execute`.
233
    """
234
235
    class ActionStatus(object):
236
        """
237
        Status of a single :class:`.Action`.
238
        """
239
240
        SUCCESSFUL = "SUCCESSFUL"
241
        "The :attr:`status` after a successful execution."
242
        FAILED = "FAILED"
243
        "The :attr:`status` after a failed execution where this action raised an exception."
244
        CANCELED = "CANCELED"
245
        "The :attr:`status` after a failed execution where a dependency raised an exception."
246
247
        def __init__(self, pending_time):
248
            self.__pending_time = pending_time
249
            self.__ready_time = None
250
            self.__cancel_time = None
251
            self.__start_time = None
252
            self.__success_time = None
253
            self.__return_value = None
254
            self.__failure_time = None
255
            self.__exception = None
256
            self.__output = None
257
258
        def _set_ready_time(self, ready_time):
259
            self.__ready_time = ready_time
260
261
        def _set_cancel_time(self, cancel_time):
262
            self.__cancel_time = cancel_time
263
264
        def _set_start_time(self, start_time):
265
            self.__start_time = start_time
266
267
        def _set_success(self, success_time, return_value):
268
            self.__success_time = success_time
269
            self.__return_value = return_value
270
            self._add_output("")
271
272
        def _set_failure(self, failure_time, exception):
273
            self.__failure_time = failure_time
274
            self.__exception = exception
275
            self._add_output("")
276
277
        def _add_output(self, output):
278
            self.__output = (self.__output or "") + output
279
280
        @property
281
        def status(self):
282
            """
283
            The status of the action:
284
            :attr:`SUCCESSFUL` if the action succeeded,
285
            :attr:`FAILED` if the action failed,
286
            and :attr:`CANCELED` if the action was canceled because some of its dependencies failed.
287
            """
288
            if self.start_time:
289
                if self.success_time:
290
                    return self.SUCCESSFUL
291
                else:
292
                    assert self.failure_time
293
                    return self.FAILED
294
            else:
295
                assert self.cancel_time
296
                return self.CANCELED
297
298
        @property
299
        def pending_time(self):
300
            """
301
            The time when this action was considered for execution.
302
303
            :rtype: datetime.datetime
304
            """
305
            return self.__pending_time
306
307
        @property
308
        def ready_time(self):
309
            """
310
            The time when this action was ready to execute.
311
            (`None` if it was canceled before being ready).
312
313
            :rtype: datetime.datetime or None
314
            """
315
            return self.__ready_time
316
317
        @property
318
        def cancel_time(self):
319
            """
320
            The time when this action was canceled.
321
            (`None` if it was started).
322
323
            :rtype: datetime.datetime or None
324
            """
325
            return self.__cancel_time
326
327
        @property
328
        def start_time(self):
329
            """
330
            The time at the beginning of the execution of this action.
331
            (`None` if it was never started).
332
333
            :rtype: datetime.datetime or None
334
            """
335
            return self.__start_time
336
337
        @property
338
        def success_time(self):
339
            """
340
            The time at the successful end of the execution of this action.
341
            (`None` if it was never started or if it failed).
342
343
            :rtype: datetime.datetime or None
344
            """
345
            return self.__success_time
346
347
        @property
348
        def return_value(self):
349
            """
350
            The value returned by this action
351
            (`None` if it failed or was never started).
352
            """
353
            return self.__return_value
354
355
        @property
356
        def failure_time(self):
357
            """
358
            The time at the successful end of the execution of this action.
359
            (`None` if it was never started or if it succeeded).
360
361
            :rtype: datetime.datetime or None
362
            """
363
            return self.__failure_time
364
365
        @property
366
        def exception(self):
367
            """
368
            The exception raised by this action
369
            (`None` if it succeeded or was never started).
370
            """
371
            return self.__exception
372
373
        @property
374
        def output(self):
375
            """
376
            Everything printed (and flushed in time) by this action.
377
            (``None`` if it never started, ``""`` it if didn't print anything)
378
379
            :rtype: str or None
380
            """
381
            return self.__output
382
383
    def __init__(self, root_action, actions, now):
384
        self._root_action = root_action
385
        self.__action_statuses = {action: self.ActionStatus(now) for action in actions}
386
387
    @property
388
    def is_success(self):
389
        """
390
        ``True`` if the execution finished without error.
391
392
        :rtype: bool
393
        """
394
        return all(
395
            action_status.status == self.ActionStatus.SUCCESSFUL
396
            for action_status in self.__action_statuses.itervalues()
397
        )
398
399
    def get_action_status(self, action):
400
        """
401
        Get the :class:`ActionStatus` of an action.
402
403
        :param Action action:
404
405
        :rtype: ActionStatus
406
        """
407
        return self.__action_statuses[action]
408
409
    def get_actions_and_statuses(self):
410
        """
411
        Get a list of actions and their statuses.
412
413
        :rtype: list(tuple(Action, ActionStatus))
414
        """
415
        return self.__action_statuses.items()
416
417
418
class DependencyGraph(object):
419
    """
420
    A visual representation of the dependency graph, using `Graphviz <http://graphviz.org/>`__.
421
    """
422
423
    def __init__(self, action):
424
        self.__graphviz_graph = graphviz.Digraph("action", node_attr={"shape": "box"})
425
        nodes = {}
426
        for (i, action) in enumerate(action.get_possible_execution_order()):
427
            node = str(i)
428
            nodes[action] = node
429
            self.__graphviz_graph.node(node, str(action.label))
430
            for dependency in action.dependencies:
431
                assert dependency in nodes  # Because we are iterating a possible execution order
432
                self.__graphviz_graph.edge(node, nodes[dependency])
433
434
    def write_to_png(self, filename):  # pragma no cover (Untestable? But small.)
435
        """
436
        Write the graph as a PNG image to the specified file.
437
438
        See also :meth:`get_graphviz_graph` if you want to draw the graph somewhere else.
439
        """
440
        directory = os.path.dirname(filename)
441
        filename = os.path.basename(filename)
442
        filename, ext = os.path.splitext(filename)
443
        g = self.get_graphviz_graph()
444
        g.format = "png"
445
        g.render(directory=directory, filename=filename, cleanup=True)
446
447
    def get_graphviz_graph(self):
448
        """
449
        Return a :class:`graphviz.Digraph` of this dependency graph.
450
451
        See also :meth:`write_to_png` for the simplest use-case.
452
        """
453
        return self.__graphviz_graph.copy()
454
455
456
class GanttChart(object):  # pragma no cover (Too difficult to unit test)
457
    """
458
    A visual representation of the timing of an execution.
459
    """
460
461
    def __init__(self, report):
462
        self.__actions = {
463
            id(action): self.__make_action(action, status)
464
            for (action, status) in report.get_actions_and_statuses()
465
        }
466
467
        self.__ordinates = {}
468
469
        dependents = {}
470
        for (action, _) in report.get_actions_and_statuses():
471
            dependents.setdefault(action, set())
472
            for dependency in action.dependencies:
473
                dependents.setdefault(dependency, set()).add(action)
474
475
        def compute(action, ordinate):
476
            self.__ordinates[id(action)] = len(self.__actions) - ordinate
477
            for d in sorted(
478
                action.dependencies,
479
                key=lambda d: report.get_action_status(d).success_time or report.get_action_status(d).failure_time
480
            ):
481
                if len(dependents[d]) == 1:
482
                    ordinate = compute(d, ordinate - 1)
483
                else:
484
                    dependents[d].remove(action)
485
            return ordinate
486
487
        last_ordinate = compute(report._root_action, len(self.__actions) - 1)
488
        assert last_ordinate == 0, last_ordinate
489
490 View Code Duplication
    class SuccessfulAction(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
491
        def __init__(self, action, status):
492
            self.__label = str(action.label)
493
            self.__id = id(action)
494
            self.__dependencies = set(id(d) for d in action.dependencies)
495
            self.__ready_time = status.ready_time
496
            self.__start_time = status.start_time
497
            self.__success_time = status.success_time
498
499
        @property
500
        def min_time(self):
501
            return self.__ready_time
502
503
        @property
504
        def max_time(self):
505
            return self.__success_time
506
507
        def draw(self, ax, ordinates, actions):
508
            ordinate = ordinates[self.__id]
509
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="blue", lw=1)
510
            ax.plot(
511
                [self.__start_time, self.__success_time], [ordinate, ordinate],
512
                color="blue", lw=4, solid_capstyle="butt",
513
            )
514
            # @todo Make sure the text is not outside the plot on the right
515
            ax.annotate(
516
                self.__label,
517
                xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points",
518
            )
519
            for d in self.__dependencies:
520
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
521
522 View Code Duplication
    class FailedAction(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
523
        def __init__(self, action, status):
524
            self.__label = str(action.label)
525
            self.__id = id(action)
526
            self.__dependencies = set(id(d) for d in action.dependencies)
527
            self.__ready_time = status.ready_time
528
            self.__start_time = status.start_time
529
            self.__failure_time = status.failure_time
530
531
        @property
532
        def min_time(self):
533
            return self.__ready_time
534
535
        @property
536
        def max_time(self):
537
            return self.__failure_time
538
539
        def draw(self, ax, ordinates, actions):
540
            ordinate = ordinates[self.__id]
541
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="red", lw=1)
542
            ax.plot(
543
                [self.__start_time, self.__failure_time], [ordinate, ordinate],
544
                color="red", lw=4, solid_capstyle="butt",
545
            )
546
            ax.annotate(
547
                self.__label,
548
                xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points",
549
            )
550
            for d in self.__dependencies:
551
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
552
553
    class CanceledAction(object):
554
        def __init__(self, action, status):
555
            self.__label = str(action.label)
556
            self.__id = id(action)
557
            self.__dependencies = set(id(d) for d in action.dependencies)
558
            self.__ready_time = status.ready_time
559
            self.__cancel_time = status.cancel_time
560
561
        @property
562
        def min_time(self):
563
            return self.__cancel_time if self.__ready_time is None else self.__ready_time
564
565
        @property
566
        def max_time(self):
567
            return self.__cancel_time
568
569
        def draw(self, ax, ordinates, actions):
570
            ordinate = ordinates[self.__id]
571
            if self.__ready_time:
572
                ax.plot([self.__ready_time, self.__cancel_time], [ordinate, ordinate], color="grey", lw=1)
573
            ax.annotate(
574
                self.__label,
575
                xy=(self.__cancel_time, ordinate), xytext=(0, 3), textcoords="offset points",
576
                color="grey",
577
            )
578
            for d in self.__dependencies:
579
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
580
581
    @classmethod
582
    def __make_action(cls, action, status):
583
        if status.status == ExecutionReport.ActionStatus.SUCCESSFUL:
584
            return cls.SuccessfulAction(action, status)
585
        elif status.status == ExecutionReport.ActionStatus.FAILED:
586
            return cls.FailedAction(action, status)
587
        elif status.status == ExecutionReport.ActionStatus.CANCELED:
588
            return cls.CanceledAction(action, status)
589
590
    def write_to_png(self, filename):
591
        """
592
        Write the Gantt chart as a PNG image to the specified file.
593
594
        See also :meth:`get_mpl_figure` and :meth:`plot_on_mpl_axes` if you want to draw the report somewhere else.
595
        """
596
        figure = self.get_mpl_figure()
597
        canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(figure)
598
        canvas.print_figure(filename)
599
600
    def get_mpl_figure(self):
601
        """
602
        Return a :class:`matplotlib.figure.Figure` of this Gantt chart.
603
604
        See also :meth:`plot_on_mpl_axes` if you want to draw the Gantt chart on your own matplotlib figure.
605
606
        See also :meth:`write_to_png` for the simplest use-case.
607
        """
608
        fig = matplotlib.figure.Figure()
609
        ax = fig.add_subplot(1, 1, 1)
610
611
        self.plot_on_mpl_axes(ax)
612
613
        return fig
614
615
    @staticmethod
616
    def __nearest(v, values):
617
        for i, value in enumerate(values):
618
            if v < value:
619
                break
620
        if i == 0:
621
            return values[0]
622
        else:
623
            if v - values[i - 1] <= values[i] - v:
624
                return values[i - 1]
625
            else:
626
                return values[i]
627
628
    __intervals = [
629
        1, 2, 5, 10, 15, 30, 60,
630
        2 * 60, 10 * 60, 30 * 60, 3600,
631
        2 * 3600, 3 * 3600, 6 * 3600, 12 * 3600, 24 * 3600,
632
    ]
633
634
    def plot_on_mpl_axes(self, ax):
635
        """
636
        Plot this Gantt chart on the provided :class:`matplotlib.axes.Axes`.
637
638
        See also :meth:`write_to_png` and :meth:`get_mpl_figure` for the simpler use-cases.
639
        """
640
        for action in self.__actions.itervalues():
641
            action.draw(ax, self.__ordinates, self.__actions)
642
643
        ax.get_yaxis().set_ticklabels([])
644
        ax.set_ylim(0.5, len(self.__actions) + 1)
645
646
        min_time = min(a.min_time for a in self.__actions.itervalues()).replace(microsecond=0)
647
        max_time = (
648
            max(a.max_time for a in self.__actions.itervalues()).replace(microsecond=0) +
649
            datetime.timedelta(seconds=1)
650
        )
651
        duration = int((max_time - min_time).total_seconds())
652
653
        ax.set_xlabel("Local time")
654
        ax.set_xlim(min_time, max_time)
655
        ax.xaxis_date()
656
        ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%H:%M:%S"))
657
        ax.xaxis.set_major_locator(matplotlib.dates.AutoDateLocator(maxticks=4, minticks=5))
658
659
        ax2 = ax.twiny()
660
        ax2.set_xlabel("Relative time")
661
        ax2.set_xlim(min_time, max_time)
662
        ticks = range(0, duration, self.__nearest(duration // 5, self.__intervals))
663
        ax2.xaxis.set_ticks([min_time + datetime.timedelta(seconds=s) for s in ticks])
664
        ax2.xaxis.set_ticklabels(ticks)
665
666
667
def _check_picklability(stuff):
668
    # This is a way to fail fast if we see a non-picklable object
669
    # because ProcessPoolExecutor freezes forever if we try to transfer
670
    # a non-picklable object through its queues
671
    pickle.loads(pickle.dumps(stuff))
672
673
674
SUCCESSED = "SUCCESSED"
675
PRINTED = "PRINTED"
676
FAILED = "FAILED"
677
PICKLING_EXCEPTION = "PICKLING_EXCEPTION"
678
679
680
class WurlitzerToEvents(wurlitzer.Wurlitzer):
681
    # This is a highly contrived use of Wurlitzer:
682
    # We just need to *capture* standards streams, so we trick Wurlitzer,
683
    # passing True instead of writeable file-like objects, and we redefine
684
    # _handle_xxx methods to intercept what it would write
685
    def __init__(self, events, action_id):
686
        super(WurlitzerToEvents, self).__init__(stdout=True, stderr=True)
687
        self.events = events
688
        self.action_id = action_id
689
690
    def _handle_stdout(self, data):
691
        self.events.put((PRINTED, self.action_id, (datetime.datetime.now(), self._decode(data))))
692
693
    def _handle_stderr(self, data):
694
        self._handle_stdout(data)
695
696
697
class _Worker(multiprocessing.Process):
698
    def __init__(self, action_id, action, events, dependency_statuses):
699
        multiprocessing.Process.__init__(self)
700
        self.action_id = action_id
701
        self.action = action
702
        self.events = events
703
        self.dependency_statuses = dependency_statuses
704
705
    def run(self):
706
        with WurlitzerToEvents(self.events, self.action_id):
707
            return_value = exception = None
708
            try:
709
                return_value = self.action.do_execute(self.dependency_statuses)
710
            except Exception as e:
711
                exception = e
712
        try:
713
            _check_picklability((exception, return_value))
714
        except:
715
            self.events.put((PICKLING_EXCEPTION, self.action_id, ()))
716
        else:
717
            end_time = datetime.datetime.now()
718
            if exception:
719
                self.events.put((FAILED, self.action_id, (end_time, exception)))
720
            else:
721
                self.events.put((SUCCESSED, self.action_id, (end_time, return_value)))
722
723
724
class _Execute(object):
725
    def __init__(self, jobs, keep_going, do_raise, hooks):
726
        self.jobs = jobs
727
        self.keep_going = keep_going
728
        self.do_raise = do_raise
729
        self.hooks = hooks
730
731
    def run(self, root_action):
732
        now = datetime.datetime.now()
733
734
        # Pre-process actions
735
        actions = root_action.get_possible_execution_order()
736
        self.actions_by_id = {id(action): action for action in actions}
737
        self.dependents = {action: set() for action in actions}
738
        for action in actions:
739
            for dependency in action.dependencies:
740
                self.dependents[dependency].add(action)
741
742
        # Misc stuff
743
        self.report = ExecutionReport(root_action, actions, now)
744
        for action in actions:
745
            self.hooks.action_pending(now, action)
746
        self.events = multiprocessing.Queue()
747
        self.exceptions = []
748
749
        # Actions by status
750
        self.pending = set(actions)
751
        self.ready = set()
752
        self.running = set()
753
        self.done = set()
754
        for action in actions:
755
            if not action.dependencies:
756
                self._prepare_action(action, now)
757
758
        # Execute
759
        while self.pending or self.ready or self.running:
760
            self._progress(now)
761
            now = datetime.datetime.now()
762
763
        for w in multiprocessing.active_children():
764
            w.join()
765
766
        if self.do_raise and self.exceptions:
767
            raise CompoundException(self.exceptions, self.report)
768
        else:
769
            return self.report
770
771
    def _cancel_action(self, action, now):
772
        self.report.get_action_status(action)._set_cancel_time(now)
773
        self.hooks.action_canceled(now, action)
774
775
        if action in self.pending:
776
            self._change_status(action, self.pending, self.done)
777
        else:
778
            self._change_status(action, self.ready, self.done)
779
780
        if not self.keep_going:
781
            for d in action.dependencies:
782
                if d in self.pending or d in self.ready:
783
                    self._cancel_action(d, now)
784
        self._triage_pending_dependents(action, True, now)
785
786
    def _triage_pending_dependents(self, action, failed, now):
787
        for dependent in self.pending & self.dependents[action]:
788
            if failed and not dependent.weak_dependencies:
789
                self._cancel_action(dependent, now)
790
            elif all(d in self.done for d in dependent.dependencies):
791
                self._prepare_action(dependent, now)
792
793
    def _prepare_action(self, action, now):
794
        self.report.get_action_status(action)._set_ready_time(now)
795
        self.hooks.action_ready(now, action)
796
797
        self._change_status(action, self.pending, self.ready)
798
799
    def _progress(self, now):
800
        while len(self.running) < self.jobs:
801
            for action in self.ready:
802
                self._start_action(action, now)
803
                break
804
            else:
805
                break
806
        self._handle_next_event()
807
808
    def _start_action(self, action, now):
809
        self.report.get_action_status(action)._set_start_time(now)
810
        self.hooks.action_started(now, action)
811
812
        dependency_statuses = {d: self.report.get_action_status(d) for d in action.dependencies}
813
        _Worker(id(action), action, self.events, dependency_statuses).start()
814
        self._change_status(action, self.ready, self.running)
815
816
    def _handle_next_event(self):
817
        (event_kind, action_id, event_payload) = self.events.get()
818
        handlers = {
819
            SUCCESSED: self._handle_successed_event,
820
            PRINTED: self._handle_printed_event,
821
            FAILED: self._handle_failed_event,
822
            PICKLING_EXCEPTION: self._handle_pickling_exception_event,
823
        }
824
        handlers[event_kind](self.actions_by_id[action_id], *event_payload)
825
826
    def _handle_successed_event(self, action, success_time, return_value):
827
        self.report.get_action_status(action)._set_success(success_time, return_value)
828
        self.hooks.action_successful(success_time, action, return_value)
829
830
        self._change_status(action, self.running, self.done)
831
        self._triage_pending_dependents(action, False, success_time)
832
833
    def _handle_printed_event(self, action, print_time, text):
834
        self.report.get_action_status(action)._add_output(text)
835
        self.hooks.action_printed(print_time, action, text)
836
837
    def _handle_failed_event(self, action, failure_time, exception):
838
        self.report.get_action_status(action)._set_failure(failure_time, exception)
839
        self.hooks.action_failed(failure_time, action, exception)
840
841
        self._change_status(action, self.running, self.done)
842
        self.exceptions.append(exception)
843
        self._triage_pending_dependents(action, True, failure_time)
844
845
    def _handle_pickling_exception_event(self, action):
846
        raise pickle.PicklingError()
847
848
    def _change_status(self, action, orig, dest):
849
        orig.remove(action)
850
        dest.add(action)
851