Completed
Push — master ( a4f2fd...4fbdfd )
by Vincent
01:14
created

WurlitzerToEvents   A

Complexity

Total Complexity 3

Size/Duplication

Total Lines 15
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 3
c 2
b 0
f 0
dl 0
loc 15
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A _handle_stdout() 0 2 1
A _handle_stderr() 0 2 1
A __init__() 0 4 1
1
# coding: utf8
2
3
# Copyright 2013-2017 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
import datetime
8
import multiprocessing
9
import os.path
10
import pickle
11
12
import graphviz
13
import matplotlib
14
import matplotlib.dates
15
import matplotlib.figure
16
import matplotlib.backends.backend_agg
17
import wurlitzer
18
19
20
def execute(action, jobs=1, keep_going=False, do_raise=True, hooks=None):
21
    """
22
    Recursively execute an :class:`.Action`'s dependencies then the action.
23
24
    :param Action action: the action to execute.
25
    :param jobs: number of actions to execute in parallel. Pass ``None`` to let ActionTree choose.
26
    :type jobs: int or None
27
    :param bool keep_going: if ``True``, then execution does not stop on first failure,
28
        but executes as many dependencies as possible.
29
    :param bool do_raise: if ``False``, then exceptions are not re-raised as :exc:`CompoundException`
30
        but only included in the :class:`.ExecutionReport`.
31
    :param Hooks hooks: its methods will be called when execution progresses.
32
33
    :raises CompoundException: when ``do_raise`` is ``True`` and dependencies raise exceptions.
34
35
    :rtype: ExecutionReport
36
    """
37
    _check_picklability(action)
38
    if jobs is None:
39
        jobs = multiprocessing.cpu_count()
40
    if hooks is None:
41
        hooks = Hooks()
42
    return _Execution(jobs, keep_going, do_raise, hooks, action).run()
43
44
45
class Action(object):
46
    """
47
    The main class of ActionTree.
48
    An action to be started after all its dependencies are finished.
49
    Pass it to :func:`.execute`.
50
51
    This is a base class for your custom actions.
52
    You must define a ``do_execute(self)`` method that performs the action.
53
    :ref:`outputs` describes how its return values, the exceptions it may raise and what it may print is handled.
54
55
    Actions, return values and exceptions raised must be picklable.
56
    """
57
58
    def __init__(self, label):
59
        """
60
        :param label: whatever you want to attach to the action.
61
            `str(label)` must succeed and return a string.
62
            Can be retrieved by :attr:`label`.
63
        """
64
        str(label)
65
        self.__label = label
66
        self.__dependencies = []
67
68
    @property
69
    def label(self):
70
        """
71
        The label passed to the constructor.
72
        """
73
        return self.__label
74
75
    def add_dependency(self, dependency):
76
        """
77
        Add a dependency to be executed before this action.
78
        Order of insertion of dependencies is not important.
79
80
        :param Action dependency:
81
82
        :raises DependencyCycleException: when adding the new dependency would create a cycle.
83
        """
84
        if self in dependency.get_possible_execution_order():
85
            raise DependencyCycleException()
86
        self.__dependencies.append(dependency)
87
88
    @property
89
    def dependencies(self):
90
        """
91
        The list of this action's direct dependencies.
92
        """
93
        return list(self.__dependencies)
94
95
    def get_possible_execution_order(self, seen_actions=None):
96
        """
97
        Return the list of all this action's dependencies (recursively),
98
        in an order that is suitable for linear execution.
99
        Note that this order is not unique.
100
        The order chosen is not specified.
101
        """
102
        if seen_actions is None:
103
            seen_actions = set()
104
        actions = []
105
        if self not in seen_actions:
106
            seen_actions.add(self)
107
            for dependency in self.__dependencies:
108
                actions += dependency.get_possible_execution_order(seen_actions)
109
            actions.append(self)
110
        return actions
111
112
113
class Hooks(object):
114
    """
115
    Base class to derive from when defining your hooks.
116
    :func:`.execute` will call its methods when execution progresses.
117
    """
118
    def action_pending(self, time, action):
119
        """
120
        Called when an action is considered for execution, i.e. at the beginning of :func:`.execute`.
121
122
        :param datetime.datetime time: the time at which the action was considered for execution.
123
        :param Action action: the action.
124
        """
125
126
    def action_ready(self, time, action):
127
        """
128
        Called when an action is ready to be executed, i.e. when all its dependencies have succeeded.
129
130
        :param datetime.datetime time: the time at which the action was ready.
131
        :param Action action: the action.
132
        """
133
134
    def action_canceled(self, time, action):
135
        """
136
        Called when an action's execution is canceled, i.e. when some of its dependencies has failed.
137
138
        :param datetime.datetime time: the time at which the action was canceled.
139
        :param Action action: the action.
140
        """
141
142
    def action_started(self, time, action):
143
        """
144
        Called when an action's execution starts.
145
146
        :param datetime.datetime time: the time at which the action was started.
147
        :param Action action: the action.
148
        """
149
150
    def action_printed(self, time, action, text):
151
        """
152
        Called when an action prints something.
153
154
        :param datetime.datetime time: the time at which the action printed the text.
155
        :param Action action: the action.
156
        :param str text: the text printed.
157
        """
158
159
    def action_successful(self, time, action):
160
        """
161
        Called when an action completes without error.
162
163
        :param datetime.datetime time: the time at which the action completed.
164
        :param Action action: the action.
165
        """
166
167
    def action_failed(self, time, action):
168
        """
169
        Called when an action completes with an exception.
170
171
        :param datetime.datetime time: the time at which the action completed.
172
        :param Action action: the action.
173
        """
174
175
176
class DependencyCycleException(Exception):
177
    """
178
    Exception thrown by :meth:`.Action.add_dependency` when adding the new dependency would create a cycle.
179
    """
180
181
    def __init__(self):
182
        super(DependencyCycleException, self).__init__("Dependency cycle")
183
184
185
class CompoundException(Exception):
186
    """
187
    Exception thrown by :func:`.execute` when dependencies raise exceptions.
188
    """
189
190
    def __init__(self, exceptions, execution_report):
191
        super(CompoundException, self).__init__(exceptions)
192
        self.__exceptions = exceptions
193
        self.__execution_report = execution_report
194
195
    @property
196
    def exceptions(self):
197
        """
198
        The list of exceptions raised.
199
        """
200
        return self.__exceptions
201
202
    @property
203
    def execution_report(self):
204
        """
205
        The :class:`.ExecutionReport` of the failed execution.
206
        """
207
        return self.__execution_report
208
209
210
class ExecutionReport(object):
211
    """
212
    ExecutionReport()
213
214
    Execution report, returned by :func:`.execute`.
215
    """
216
217
    class ActionStatus(object):
218
        """
219
        Status of a single :class:`.Action`.
220
        """
221
222
        SUCCESSFUL = "SUCCESSFUL"
223
        "The :attr:`status` after a successful execution."
224
        FAILED = "FAILED"
225
        "The :attr:`status` after a failed execution where this action raised an exception."
226
        CANCELED = "CANCELED"
227
        "The :attr:`status` after a failed execution where a dependency raised an exception."
228
229
        def __init__(self, pending_time):
230
            self.__pending_time = pending_time
231
            self.__ready_time = None
232
            self.__cancel_time = None
233
            self.__start_time = None
234
            self.__success_time = None
235
            self.__return_value = None
236
            self.__failure_time = None
237
            self.__exception = None
238
            self.__output = None
239
240
        def _set_ready_time(self, ready_time):
241
            self.__ready_time = ready_time
242
243
        def _set_cancel_time(self, cancel_time):
244
            self.__cancel_time = cancel_time
245
246
        def _set_start_time(self, start_time):
247
            self.__start_time = start_time
248
249
        def _set_success(self, success_time, return_value):
250
            self.__success_time = success_time
251
            self.__return_value = return_value
252
            self._add_output("")
253
254
        def _set_failure(self, failure_time, exception):
255
            self.__failure_time = failure_time
256
            self.__exception = exception
257
            self._add_output("")
258
259
        def _add_output(self, output):
260
            self.__output = (self.__output or "") + output
261
262
        @property
263
        def status(self):
264
            """
265
            The status of the action:
266
            :attr:`SUCCESSFUL` if the action succeeded,
267
            :attr:`FAILED` if the action failed,
268
            and :attr:`CANCELED` if the action was canceled because some of its dependencies failed.
269
            """
270
            if self.start_time:
271
                if self.success_time:
272
                    return self.SUCCESSFUL
273
                else:
274
                    assert self.failure_time
275
                    return self.FAILED
276
            else:
277
                assert self.cancel_time
278
                return self.CANCELED
279
280
        @property
281
        def pending_time(self):
282
            """
283
            The time when this action was considered for execution.
284
285
            :rtype: datetime.datetime
286
            """
287
            return self.__pending_time
288
289
        @property
290
        def ready_time(self):
291
            """
292
            The time when this action was ready to execute.
293
            (`None` if it was canceled before being ready).
294
295
            :rtype: datetime.datetime or None
296
            """
297
            return self.__ready_time
298
299
        @property
300
        def cancel_time(self):
301
            """
302
            The time when this action was canceled.
303
            (`None` if it was started).
304
305
            :rtype: datetime.datetime or None
306
            """
307
            return self.__cancel_time
308
309
        @property
310
        def start_time(self):
311
            """
312
            The time at the beginning of the execution of this action.
313
            (`None` if it was never started).
314
315
            :rtype: datetime.datetime or None
316
            """
317
            return self.__start_time
318
319
        @property
320
        def success_time(self):
321
            """
322
            The time at the successful end of the execution of this action.
323
            (`None` if it was never started or if it failed).
324
325
            :rtype: datetime.datetime or None
326
            """
327
            return self.__success_time
328
329
        @property
330
        def return_value(self):
331
            """
332
            The value returned by this action
333
            (`None` if it failed or was never started).
334
            """
335
            return self.__return_value
336
337
        @property
338
        def failure_time(self):
339
            """
340
            The time at the successful end of the execution of this action.
341
            (`None` if it was never started or if it succeeded).
342
343
            :rtype: datetime.datetime or None
344
            """
345
            return self.__failure_time
346
347
        @property
348
        def exception(self):
349
            """
350
            The exception raised by this action
351
            (`None` if it succeeded or was never started).
352
            """
353
            return self.__exception
354
355
        @property
356
        def output(self):
357
            """
358
            Everything printed (and flushed in time) by this action.
359
            (`None` if it never started, `""` it if didn't print anything)
360
361
            :rtype: str or None
362
            """
363
            return self.__output
364
365
    def __init__(self, root_action, actions, now):
366
        self._root_action = root_action
367
        self.__action_statuses = {action: self.ActionStatus(now) for action in actions}
368
369
    @property
370
    def is_success(self):
371
        """
372
        ``True`` if the execution finished without error.
373
374
        :rtype: bool
375
        """
376
        return all(
377
            action_status.status == self.ActionStatus.SUCCESSFUL
378
            for action_status in self.__action_statuses.itervalues()
379
        )
380
381
    def get_action_status(self, action):
382
        """
383
        Get the :class:`ActionStatus` of an action.
384
385
        :param Action action:
386
387
        :rtype: ActionStatus
388
        """
389
        return self.__action_statuses[action]
390
391
    def get_actions_and_statuses(self):
392
        """
393
        Get a list of actions and their statuses.
394
395
        :rtype: list(tuple(Action, ActionStatus))
396
        """
397
        return self.__action_statuses.items()
398
399
400
class DependencyGraph(object):
401
    """
402
    A visual representation of the dependency graph, using `Graphviz <http://graphviz.org/>`__.
403
    """
404
405
    def __init__(self, action):
406
        self.__graphviz_graph = graphviz.Digraph("action", node_attr={"shape": "box"})
407
        nodes = {}
408
        for (i, action) in enumerate(action.get_possible_execution_order()):
409
            node = str(i)
410
            nodes[action] = node
411
            self.__graphviz_graph.node(node, str(action.label))
412
            for dependency in action.dependencies:
413
                assert dependency in nodes  # Because we are iterating a possible execution order
414
                self.__graphviz_graph.edge(node, nodes[dependency])
415
416
    def write_to_png(self, filename):  # pragma no cover (Untestable? But small.)
417
        """
418
        Write the graph as a PNG image to the specified file.
419
420
        See also :meth:`get_graphviz_graph` if you want to draw the graph somewhere else.
421
        """
422
        directory = os.path.dirname(filename)
423
        filename = os.path.basename(filename)
424
        filename, ext = os.path.splitext(filename)
425
        g = self.get_graphviz_graph()
426
        g.format = "png"
427
        g.render(directory=directory, filename=filename, cleanup=True)
428
429
    def get_graphviz_graph(self):
430
        """
431
        Return a :class:`graphviz.Digraph` of this dependency graph.
432
433
        See also :meth:`write_to_png` for the simplest use-case.
434
        """
435
        return self.__graphviz_graph.copy()
436
437
438
class GanttChart(object):  # pragma no cover (Too difficult to unit test)
439
    """
440
    A visual representation of the timing of an execution.
441
    """
442
443
    def __init__(self, report):
444
        self.__actions = {
445
            id(action): self.__make_action(action, status)
446
            for (action, status) in report.get_actions_and_statuses()
447
        }
448
449
        self.__ordinates = {}
450
451
        dependents = {}
452
        for (action, _) in report.get_actions_and_statuses():
453
            dependents.setdefault(action, set())
454
            for dependency in action.dependencies:
455
                dependents.setdefault(dependency, set()).add(action)
456
457
        def compute(action, ordinate):
458
            self.__ordinates[id(action)] = len(self.__actions) - ordinate
459
            for d in sorted(
460
                action.dependencies,
461
                key=lambda d: report.get_action_status(d).success_time or report.get_action_status(d).failure_time
462
            ):
463 View Code Duplication
                if len(dependents[d]) == 1:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
464
                    ordinate = compute(d, ordinate - 1)
465
                else:
466
                    dependents[d].remove(action)
467
            return ordinate
468
469
        last_ordinate = compute(report._root_action, len(self.__actions) - 1)
470
        assert last_ordinate == 0, last_ordinate
471
472
    class SuccessfulAction(object):
473
        def __init__(self, action, status):
474
            self.__label = str(action.label)
475
            self.__id = id(action)
476
            self.__dependencies = set(id(d) for d in action.dependencies)
477
            self.__ready_time = status.ready_time
478
            self.__start_time = status.start_time
479
            self.__success_time = status.success_time
480
481
        @property
482
        def min_time(self):
483
            return self.__ready_time
484
485
        @property
486
        def max_time(self):
487
            return self.__success_time
488
489
        def draw(self, ax, ordinates, actions):
490 View Code Duplication
            ordinate = ordinates[self.__id]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
491
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="blue", lw=1)
492
            # @todo Use an other end-style to avoid pixels before/after min/max_time
493
            ax.plot([self.__start_time, self.__success_time], [ordinate, ordinate], color="blue", lw=4)
494
            # @todo Make sure the text is not outside the plot on the right
495
            ax.annotate(self.__label, xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points")
496
            for d in self.__dependencies:
497
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
498
499
    class FailedAction(object):
500
        def __init__(self, action, status):
501
            self.__label = str(action.label)
502
            self.__id = id(action)
503
            self.__dependencies = set(id(d) for d in action.dependencies)
504
            self.__ready_time = status.ready_time
505
            self.__start_time = status.start_time
506
            self.__failure_time = status.failure_time
507
508
        @property
509
        def min_time(self):
510
            return self.__ready_time
511
512
        @property
513
        def max_time(self):
514
            return self.__failure_time
515
516
        def draw(self, ax, ordinates, actions):
517
            ordinate = ordinates[self.__id]
518
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="red", lw=1)
519
            ax.plot([self.__start_time, self.__failure_time], [ordinate, ordinate], color="red", lw=4)
520
            ax.annotate(self.__label, xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points")
521
            for d in self.__dependencies:
522
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
523
524
    class CanceledAction(object):
525
        def __init__(self, action, status):
526
            self.__label = str(action.label)
527
            self.__id = id(action)
528
            self.__dependencies = set(id(d) for d in action.dependencies)
529
            self.__ready_time = status.ready_time
530
            self.__cancel_time = status.cancel_time
531
532
        @property
533
        def min_time(self):
534
            return self.__cancel_time if self.__ready_time is None else self.__ready_time
535
536
        @property
537
        def max_time(self):
538
            return self.__cancel_time
539
540
        def draw(self, ax, ordinates, actions):
541
            ordinate = ordinates[self.__id]
542
            if self.__ready_time:
543
                ax.plot([self.__ready_time, self.__cancel_time], [ordinate, ordinate], color="grey", lw=1)
544
            ax.annotate(
545
                "(Canceled) {}".format(self.__label),
546
                xy=(self.__cancel_time, ordinate),
547
                xytext=(0, 3),
548
                textcoords="offset points"
549
            )
550
            for d in self.__dependencies:
551
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
552
553
    @classmethod
554
    def __make_action(cls, action, status):
555
        if status.status == ExecutionReport.ActionStatus.SUCCESSFUL:
556
            return cls.SuccessfulAction(action, status)
557
        elif status.status == ExecutionReport.ActionStatus.FAILED:
558
            return cls.FailedAction(action, status)
559
        elif status.status == ExecutionReport.ActionStatus.CANCELED:
560
            return cls.CanceledAction(action, status)
561
562
    def write_to_png(self, filename):
563
        """
564
        Write the Gantt chart as a PNG image to the specified file.
565
566
        See also :meth:`get_mpl_figure` and :meth:`plot_on_mpl_axes` if you want to draw the report somewhere else.
567
        """
568
        figure = self.get_mpl_figure()
569
        canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(figure)
570
        canvas.print_figure(filename)
571
572
    def get_mpl_figure(self):
573
        """
574
        Return a :class:`matplotlib.figure.Figure` of this Gantt chart.
575
576
        See also :meth:`plot_on_mpl_axes` if you want to draw the Gantt chart on your own matplotlib figure.
577
578
        See also :meth:`write_to_png` for the simplest use-case.
579
        """
580
        fig = matplotlib.figure.Figure()
581
        ax = fig.add_subplot(1, 1, 1)
582
583
        self.plot_on_mpl_axes(ax)
584
585
        return fig
586
587
    @staticmethod
588
    def __nearest(v, values):
589
        for i, value in enumerate(values):
590
            if v < value:
591
                break
592
        if i == 0:
593
            return values[0]
594
        else:
595 View Code Duplication
            if v - values[i - 1] <= values[i] - v:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
596
                return values[i - 1]
597
            else:
598
                return values[i]
599
600
    __intervals = [
601
        1, 2, 5, 10, 15, 30, 60,
602
        2 * 60, 10 * 60, 30 * 60, 3600,
603
        2 * 3600, 3 * 3600, 6 * 3600, 12 * 3600, 24 * 3600,
604
    ]
605
606
    def plot_on_mpl_axes(self, ax):
607
        """
608
        Plot this Gantt chart on the provided :class:`matplotlib.axes.Axes`.
609
610
        See also :meth:`write_to_png` and :meth:`get_mpl_figure` for the simpler use-cases.
611
        """
612
        for action in self.__actions.itervalues():
613
            action.draw(ax, self.__ordinates, self.__actions)
614
615
        ax.get_yaxis().set_ticklabels([])
616
        ax.set_ylim(0.5, len(self.__actions) + 1)
617
618
        min_time = min(a.min_time for a in self.__actions.itervalues()).replace(microsecond=0)
619
        max_time = (
620
            max(a.max_time for a in self.__actions.itervalues()).replace(microsecond=0) +
621
            datetime.timedelta(seconds=1)
622 View Code Duplication
        )
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
623
        duration = int((max_time - min_time).total_seconds())
624
625
        ax.set_xlabel("Local time")
626
        ax.set_xlim(min_time, max_time)
627
        ax.xaxis_date()
628
        ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%H:%M:%S"))
629
        ax.xaxis.set_major_locator(matplotlib.dates.AutoDateLocator(maxticks=4, minticks=5))
630
631
        ax2 = ax.twiny()
632
        ax2.set_xlabel("Relative time")
633
        ax2.set_xlim(min_time, max_time)
634
        ticks = range(0, duration, self.__nearest(duration // 5, self.__intervals))
635
        ax2.xaxis.set_ticks([min_time + datetime.timedelta(seconds=s) for s in ticks])
636
        ax2.xaxis.set_ticklabels(ticks)
637
638
639
def _check_picklability(stuff):
640
    # This is a way to fail fast if we see a non-picklable object
641
    # because ProcessPoolExecutor freezes forever if we try to transfer
642
    # a non-picklable object through its queues
643
    pickle.loads(pickle.dumps(stuff))
644
645
646
class WurlitzerToEvents(wurlitzer.Wurlitzer):
647
    # This is a highly contrived use of Wurlitzer:
648
    # We just need to *capture* standards streams, so we trick Wurlitzer,
649
    # passing True instead of writeable file-like objects, and we redefine
650
    # _handle_xxx methods to intercept what it would write
651
    def __init__(self, events, action_id):
652
        super(WurlitzerToEvents, self).__init__(stdout=True, stderr=True)
653
        self.events = events
654
        self.action_id = action_id
655
656
    def _handle_stdout(self, data):
657
        self.events.put(_PrintedEvent(self.action_id, datetime.datetime.now(), self._decode(data)))
658
659
    def _handle_stderr(self, data):
660
        self._handle_stdout(data)
661
662
663
class _Worker(multiprocessing.Process):
664
    def __init__(self, action_id, action, events):
665
        multiprocessing.Process.__init__(self)
666
        self.action_id = action_id
667
        self.action = action
668
        self.events = events
669
670
    def run(self):
671
        with WurlitzerToEvents(self.events, self.action_id):
672
            return_value = exception = None
673
            try:
674
                self.events.put(_StartedEvent(self.action_id, datetime.datetime.now()))
675
                return_value = self.action.do_execute()
676
            except Exception as e:
677
                exception = e
678
        try:
679
            _check_picklability((exception, return_value))
680
        except:
681
            self.events.put(_PicklingExceptionEvent(self.action_id))
682
        else:
683
            end_time = datetime.datetime.now()
684
            if exception:
685
                self.events.put(_FailedEvent(self.action_id, end_time, exception))
686
            else:
687
                self.events.put(_SuccessedEvent(self.action_id, end_time, return_value))
688
689
690
class _Event(object):
691
    def __init__(self, action_id):
692
        self.action_id = action_id
693
694
695
class _StartedEvent(_Event):
696
    def __init__(self, action_id, start_time):
697
        super(_StartedEvent, self).__init__(action_id)
698
        self.start_time = start_time
699
700
    def apply(self, execution, action):
701
        execution.report.get_action_status(action)._set_start_time(self.start_time)
702
        execution.hooks.action_started(self.start_time, action)
703
704
705
class _SuccessedEvent(_Event):
706
    def __init__(self, action_id, success_time, return_value):
707
        super(_SuccessedEvent, self).__init__(action_id)
708
        self.success_time = success_time
709
        self.return_value = return_value
710
711
    def apply(self, execution, action):
712
        execution.submitted.remove(action)
713
        execution.successful.add(action)
714
        execution.report.get_action_status(action)._set_success(self.success_time, self.return_value)
715
        execution.hooks.action_successful(self.success_time, action)
716
717
718
class _PrintedEvent(_Event):
719
    def __init__(self, action_id, print_time, text):
720
        super(_PrintedEvent, self).__init__(action_id)
721
        self.print_time = print_time
722
        self.text = text
723
724
    def apply(self, execution, action):
725
        execution.report.get_action_status(action)._add_output(self.text)
726
        execution.hooks.action_printed(self.print_time, action, self.text)
727
728
729
class _FailedEvent(_Event):
730
    def __init__(self, action_id, failure_time, exception):
731
        super(_FailedEvent, self).__init__(action_id)
732
        self.failure_time = failure_time
733
        self.exception = exception
734
735
    def apply(self, execution, action):
736
        execution.submitted.remove(action)
737
        execution.failed.add(action)
738
        execution.exceptions.append(self.exception)
739
        execution.report.get_action_status(action)._set_failure(self.failure_time, self.exception)
740
        execution.hooks.action_failed(self.failure_time, action)
741
742
743
class _PicklingExceptionEvent(_Event):
744
    def apply(self, execution, action):
745
        raise pickle.PicklingError()
746
747
748
class _Execution(object):
749
    def __init__(self, jobs, keep_going, do_raise, hooks, action):
750
        self.root_action = action
751
        self.events = multiprocessing.Queue()
752
        self.jobs = jobs
753
        self.keep_going = keep_going
754
        self.do_raise = do_raise
755
        self.hooks = hooks
756
        self.actions_by_id = {id(action): action for action in action.get_possible_execution_order()}
757
        self.pending = set(self.actions_by_id.itervalues())
758
        self.submitted = set()
759
        self.successful = set()
760
        self.failed = set()
761
        self.exceptions = []
762
        self.report = None
763
764
    def run(self):
765
        now = datetime.datetime.now()
766
        self.report = ExecutionReport(self.root_action, self.pending, now)
767
        for action in self.pending:
768
            self.hooks.action_pending(now, action)
769
        while self.pending or self.submitted:
770
            self._progress()
771
772
        for w in multiprocessing.active_children():
773
            w.join()
774
775
        if self.do_raise and self.exceptions:
776
            raise CompoundException(self.exceptions, self.report)
777
        else:
778
            return self.report
779
780
    def _progress(self):
781
        now = datetime.datetime.now()
782
        if self.keep_going or not self.exceptions:
783
            self._submit_or_cancel(now)
784
        else:
785
            self._cancel(now)
786
        self._wait()
787
788
    def _submit_or_cancel(self, now):
789
        go_on = True
790
        while go_on:
791
            go_on = False
792
            for action in set(self.pending):
793
                done = self.successful | self.failed
794
                if all(d in done for d in action.dependencies):
795
                    if any(d in self.failed for d in action.dependencies):
796
                        self._mark_action_canceled(action, cancel_time=now)
797
                        self.pending.remove(action)
798
                        go_on = True
799
                    else:
800
                        status = self.report.get_action_status(action)
801
                        if status.ready_time is None:
802
                            status._set_ready_time(now)
803
                            self.hooks.action_ready(now, action)
804
                        if len(self.submitted) <= self.jobs:
805
                            self._submit_action(action, ready_time=now)
806
807
    def _cancel(self, now):
808
        for action in self.pending:
809
            self._mark_action_canceled(action, cancel_time=now)
810
        self.pending.clear()
811
812
    def _wait(self):
813
        if self.submitted:
814
            event = self.events.get()
815
            event.apply(self, self.actions_by_id[event.action_id])
816
817
    def _submit_action(self, action, ready_time):
818
        _Worker(id(action), action, self.events).start()
819
        self.submitted.add(action)
820
        self.pending.remove(action)
821
822
    def _mark_action_canceled(self, action, cancel_time):
823
        self.failed.add(action)
824
        self.report.get_action_status(action)._set_cancel_time(cancel_time)
825
        self.hooks.action_canceled(cancel_time, action)
826