Completed
Push — master ( 91f4e1...31c7ae )
by Vincent
01:16
created

Action   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 82
Duplicated Lines 0 %

Importance

Changes 10
Bugs 0 Features 0
Metric Value
wmc 10
c 10
b 0
f 0
dl 0
loc 82
rs 10
1
# coding: utf8
2
3
# Copyright 2013-2017 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
import datetime
8
import multiprocessing
9
import os.path
10
import pickle
11
12
import graphviz
13
import matplotlib
14
import matplotlib.dates
15
import matplotlib.figure
16
import matplotlib.backends.backend_agg
17
import wurlitzer
18
19
20
def execute(action, cpu_cores=None, keep_going=False, do_raise=True, hooks=None):
21
    """
22
    Recursively execute an :class:`.Action`'s dependencies then the action.
23
24
    :param Action action: the action to execute.
25
    :param cpu_cores: number of CPU cores to use in parallel.
26
        Pass ``None`` (the default value) to let ActionTree choose.
27
        Pass :attr:`UNLIMITED` to execute an unlimited number of actions in parallel
28
        (make sure your system has the necessary resources).
29
        Note: CPU cores are managed like any other :class:`Resource`, and this parameter sets the availability
30
        of :obj:`CPU_CORE` for this execution.
31
    :type cpu_cores: int or None or UNLIMITED
32
    :param bool keep_going: if ``True``, then execution does not stop on first failure,
33
        but executes as many dependencies as possible.
34
    :param bool do_raise: if ``False``, then exceptions are not re-raised as :exc:`CompoundException`
35
        but only included in the :class:`.ExecutionReport`.
36
    :param Hooks hooks: its methods will be called when execution progresses.
37
38
    :raises CompoundException: when ``do_raise`` is ``True`` and dependencies raise exceptions.
39
40
    :rtype: ExecutionReport
41
    """
42
    if cpu_cores is None:
43
        cpu_cores = multiprocessing.cpu_count()
44
    if hooks is None:
45
        hooks = Hooks()
46
    return _Execute(cpu_cores, keep_going, do_raise, hooks).run(action)
47
48
49
UNLIMITED = object()
50
"""The availability of an infinite :class:`Resource`."""
51
52
53
class Action(object):
54
    """
55
    The main class of ActionTree.
56
    An action to be started after all its dependencies are finished.
57
    Pass it to :func:`.execute`.
58
59
    This is a base class for your custom actions.
60
    You must define a ``do_execute(self, dependency_statuses)`` method that performs the action.
61
    The ``dependency_statuses`` argument is a dictionary whose keys are ``self.dependencies`` and values are their
62
    :class:`.ActionStatus`.
63
    :ref:`outputs` describes how its return values, the exceptions it may raise and what it may print is handled.
64
65
    Actions, return values and exceptions raised must be picklable.
66
    """
67
68
    def __init__(self, label, weak_dependencies=False):
69
        """
70
        :param label: whatever you want to attach to the action.
71
            ``str(label)`` must succeed and return a string.
72
            Can be retrieved by :attr:`label`.
73
        :param bool weak_dependencies:
74
            it ``True``, then the action will execute even if some of its dependencies failed.
75
        """
76
        str(label)
77
        self.__label = label
78
        self.__weak_dependencies = weak_dependencies
79
        self.__dependencies = []
80
        self.__resources = {CPU_CORE: 1}
81
82
    @property
83
    def label(self):
84
        """
85
        The label passed to the constructor.
86
        """
87
        return self.__label
88
89
    @property
90
    def weak_dependencies(self):
91
        """
92
        ``True`` if the action will execute even if some of its dependencies failed.
93
94
        :rtype: bool
95
        """
96
        return self.__weak_dependencies
97
98
    def add_dependency(self, dependency):
99
        """
100
        Add a dependency to be executed before this action.
101
        Order of insertion of dependencies is not important.
102
103
        :param Action dependency:
104
105
        :raises DependencyCycleException: when adding the new dependency would create a cycle.
106
        """
107
        if self in dependency.get_possible_execution_order():  # Not in user guide: implementation detail
108
            raise DependencyCycleException()
109
        self.__dependencies.append(dependency)
110
111
    @property
112
    def dependencies(self):
113
        """
114
        The list of this action's direct dependencies.
115
        """
116
        return list(self.__dependencies)
117
118
    def require_resource(self, resource, quantity=1):
119
        """
120
        Set the quantity of a certain :class:`.Resource` required to run this action.
121
122
        Note that an action that requires more than a resource's availability *will* be executed anyway.
123
        It will just not be executed in parallel with any other action that requires the same resource.
124
125
        :param Resource resource:
126
        :param int quantity:
127
        """
128
        self.__resources[resource] = quantity
129
130
    @property
131
    def resources_required(self):
132
        """
133
        The list of this action's required resources and quantities required.
134
135
        :rtype: list(tuple(Resource, int))
136
        """
137
        return list(self.__resources.iteritems())
138
139
    def get_possible_execution_order(self, seen_actions=None):
140
        """
141
        Return the list of all this action's dependencies (recursively),
142
        in an order that is suitable for linear execution.
143
        Note that this order is not unique.
144
        The order chosen is not specified.
145
        """
146
        if seen_actions is None:
147
            seen_actions = set()
148
        actions = []
149
        if self not in seen_actions:
150
            seen_actions.add(self)
151
            for dependency in self.__dependencies:
152
                actions += dependency.get_possible_execution_order(seen_actions)
153
            actions.append(self)
154
        return actions
155
156
157
class Resource(object):
158
    """
159
    A resource that an :class:`Action` can require for its execution.
160
    You can use resources to protect stuff that must not be used by more than N actions at the same time,
161
    à la `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`_.
162
    Like semaphorees, with an availability of 1,
163
    they become `mutexes <https://en.wikipedia.org/wiki/Lock_(computer_science)>`_.
164
165
    :ref:`resources` Describes how to use this class.
166
    """
167
168
    def __init__(self, availability):
169
        """
170
        :param availability: the number of instances available for this resource
171
        :type availability: int or UNLIMITED
172
        """
173
        self.__availability = availability
174
175
    def _availability(self, cpu_cores):
176
        return self.__availability
177
178
179
class CpuCoreResource(Resource):
180
    def _availability(self, cpu_cores):
181
        return cpu_cores
182
183
184
CPU_CORE = CpuCoreResource(0)
185
"""
186
A special :class:`.Resource` representing a processing unit.
187
You can pass it to :meth:`.Action.require_resource` if your action will execute on more than one core.
188
189
:type: Resource
190
"""
191
192
193
class Hooks(object):
194
    """
195
    Base class to derive from when defining your hooks.
196
    :func:`.execute` will call its methods when execution progresses.
197
    """
198
    def action_pending(self, time, action):
199
        """
200
        Called when an action is considered for execution, i.e. at the beginning of :func:`.execute`.
201
202
        :param datetime.datetime time: the time at which the action was considered for execution.
203
        :param Action action: the action.
204
        """
205
206
    def action_ready(self, time, action):
207
        """
208
        Called when an action is ready to be executed, i.e. when all its dependencies have succeeded.
209
210
        :param datetime.datetime time: the time at which the action was ready.
211
        :param Action action: the action.
212
        """
213
214
    def action_canceled(self, time, action):
215
        """
216
        Called when an action's execution is canceled, i.e. when some of its dependencies has failed.
217
218
        :param datetime.datetime time: the time at which the action was canceled.
219
        :param Action action: the action.
220
        """
221
222
    def action_started(self, time, action):
223
        """
224
        Called when an action's execution starts.
225
226
        :param datetime.datetime time: the time at which the action was started.
227
        :param Action action: the action.
228
        """
229
230
    def action_printed(self, time, action, text):
231
        """
232
        Called when an action prints something.
233
234
        :param datetime.datetime time: the time at which the action printed the text.
235
        :param Action action: the action.
236
        :param str text: the text printed.
237
        """
238
239
    def action_successful(self, time, action, return_value):
240
        """
241
        Called when an action completes without error.
242
243
        :param datetime.datetime time: the time at which the action completed.
244
        :param Action action: the action.
245
        :param return_value: the value returned by the action.
246
        """
247
248
    def action_failed(self, time, action, exception):
249
        """
250
        Called when an action completes with an exception.
251
252
        :param datetime.datetime time: the time at which the action completed.
253
        :param Action action: the action.
254
        :param exception: the exception raised by the action
255
        """
256
257
258
class DependencyCycleException(Exception):  # Not in user guide: implementation detail
259
    """
260
    Exception thrown by :meth:`.Action.add_dependency` when adding the new dependency would create a cycle.
261
    """
262
263
    def __init__(self):
264
        super(DependencyCycleException, self).__init__("Dependency cycle")
265
266
267
class CompoundException(Exception):
268
    """
269
    Exception thrown by :func:`.execute` when dependencies raise exceptions.
270
    """
271
272
    def __init__(self, exceptions, execution_report):
273
        super(CompoundException, self).__init__(exceptions)
274
        self.__exceptions = exceptions
275
        self.__execution_report = execution_report
276
277
    @property
278
    def exceptions(self):
279
        """
280
        The list of exceptions raised.
281
        """
282
        return self.__exceptions
283
284
    @property
285
    def execution_report(self):
286
        """
287
        The :class:`.ExecutionReport` of the failed execution.
288
        """
289
        return self.__execution_report
290
291
292
class ExecutionReport(object):
293
    """
294
    ExecutionReport()
295
296
    Execution report, returned by :func:`.execute`.
297
    """
298
299
    class ActionStatus(object):
300
        """
301
        Status of a single :class:`.Action`.
302
        """
303
304
        def __init__(self, pending_time):
305
            self.__pending_time = pending_time
306
            self.__ready_time = None
307
            self.__cancel_time = None
308
            self.__start_time = None
309
            self.__success_time = None
310
            self.__return_value = None
311
            self.__failure_time = None
312
            self.__exception = None
313
            self.__output = None
314
315
        def _set_ready_time(self, ready_time):
316
            self.__ready_time = ready_time
317
318
        def _set_cancel_time(self, cancel_time):
319
            self.__cancel_time = cancel_time
320
321
        def _set_start_time(self, start_time):
322
            self.__start_time = start_time
323
324
        def _set_success(self, success_time, return_value):
325
            self.__success_time = success_time
326
            self.__return_value = return_value
327
            self._add_output("")
328
329
        def _set_failure(self, failure_time, exception):
330
            self.__failure_time = failure_time
331
            self.__exception = exception
332
            self._add_output("")
333
334
        def _add_output(self, output):
335
            self.__output = (self.__output or "") + output
336
337
        @property
338
        def status(self):
339
            """
340
            The status of the action:
341
            :attr:`SUCCESSFUL` if the action succeeded,
342
            :attr:`FAILED` if the action failed,
343
            and :attr:`CANCELED` if the action was canceled because some of its dependencies failed.
344
            """
345
            if self.start_time:
346
                if self.success_time:
347
                    return SUCCESSFUL
348
                else:
349
                    assert self.failure_time
350
                    return FAILED
351
            else:
352
                assert self.cancel_time
353
                return CANCELED
354
355
        @property
356
        def pending_time(self):
357
            """
358
            The time when this action was considered for execution.
359
360
            :rtype: datetime.datetime
361
            """
362
            return self.__pending_time
363
364
        @property
365
        def ready_time(self):
366
            """
367
            The time when this action was ready to execute.
368
            (``None`` if it was canceled before being ready).
369
370
            :rtype: datetime.datetime or None
371
            """
372
            return self.__ready_time
373
374
        @property
375
        def cancel_time(self):
376
            """
377
            The time when this action was canceled.
378
            (``None`` if it was started).
379
380
            :rtype: datetime.datetime or None
381
            """
382
            return self.__cancel_time
383
384
        @property
385
        def start_time(self):
386
            """
387
            The time at the beginning of the execution of this action.
388
            (``None`` if it was never started).
389
390
            :rtype: datetime.datetime or None
391
            """
392
            return self.__start_time
393
394
        @property
395
        def success_time(self):
396
            """
397
            The time at the successful end of the execution of this action.
398
            (``None`` if it was never started or if it failed).
399
400
            :rtype: datetime.datetime or None
401
            """
402
            return self.__success_time
403
404
        @property
405
        def return_value(self):
406
            """
407
            The value returned by this action
408
            (``None`` if it failed or was never started).
409
            """
410
            return self.__return_value
411
412
        @property
413
        def failure_time(self):
414
            """
415
            The time at the successful end of the execution of this action.
416
            (``None`` if it was never started or if it succeeded).
417
418
            :rtype: datetime.datetime or None
419
            """
420
            return self.__failure_time
421
422
        @property
423
        def exception(self):
424
            """
425
            The exception raised by this action
426
            (``None`` if it succeeded or was never started).
427
            """
428
            return self.__exception
429
430
        @property
431
        def output(self):
432
            """
433
            Everything printed (and flushed in time) by this action.
434
            (``None`` if it never started, ``""`` it if didn't print anything)
435
436
            :rtype: str or None
437
            """
438
            return self.__output
439
440
    def __init__(self, root_action, actions, now):
441
        self._root_action = root_action
442
        self.__action_statuses = {action: self.ActionStatus(now) for action in actions}
443
444
    @property
445
    def is_success(self):
446
        """
447
        ``True`` if the execution finished without error.
448
449
        :rtype: bool
450
        """
451
        return all(
452
            action_status.status == SUCCESSFUL
453
            for action_status in self.__action_statuses.itervalues()
454
        )
455
456
    def get_action_status(self, action):
457
        """
458
        Get the :class:`ActionStatus` of an action.
459
460
        :param Action action:
461
462
        :rtype: ActionStatus
463
        """
464
        return self.__action_statuses[action]
465
466
    def get_actions_and_statuses(self):
467
        """
468
        Get a list of actions and their statuses.
469
470
        :rtype: list(tuple(Action, ActionStatus))
471
        """
472
        return self.__action_statuses.items()
473
474
475
SUCCESSFUL = "SUCCESSFUL"
476
"The :attr:`.ActionStatus.status` after a successful execution."
477
478
FAILED = "FAILED"
479
"The :attr:`.ActionStatus.status` after a failed execution where this action raised an exception."
480
481
CANCELED = "CANCELED"
482
"The :attr:`.ActionStatus.status` after a failed execution where a dependency raised an exception."
483
484
PRINTED = "PRINTED"
485
486
PICKLING_EXCEPTION = "PICKLING_EXCEPTION"
487
488
489
class DependencyGraph(object):
490 View Code Duplication
    """
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
491
    A visual representation of the dependency graph, using `Graphviz <http://graphviz.org/>`__.
492
    """
493
494
    def __init__(self, action):
495
        self.__graphviz_graph = graphviz.Digraph("action", node_attr={"shape": "box"})
496
        nodes = {}
497
        for (i, action) in enumerate(action.get_possible_execution_order()):
498
            node = str(i)
499
            nodes[action] = node
500
            self.__graphviz_graph.node(node, str(action.label))
501
            for dependency in action.dependencies:
502
                assert dependency in nodes  # Because we are iterating a possible execution order
503
                self.__graphviz_graph.edge(node, nodes[dependency])
504
505
    def write_to_png(self, filename):  # Not unittested: too difficult
506
        """
507
        Write the graph as a PNG image to the specified file.
508
509
        See also :meth:`get_graphviz_graph` if you want to draw the graph somewhere else.
510
        """
511
        directory = os.path.dirname(filename)
512
        filename = os.path.basename(filename)
513
        filename, ext = os.path.splitext(filename)
514
        g = self.get_graphviz_graph()
515
        g.format = "png"
516
        g.render(directory=directory, filename=filename, cleanup=True)
517
518
    def get_graphviz_graph(self):
519
        """
520
        Return a :class:`graphviz.Digraph` of this dependency graph.
521
522 View Code Duplication
        See also :meth:`write_to_png` for the simplest use-case.
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
523
        """
524
        return self.__graphviz_graph.copy()
525
526
527
class GanttChart(object):  # Not unittested: too difficult
528
    """
529
    A visual representation of the timing of an execution.
530
    """
531
532
    def __init__(self, report):
533
        self.__actions = {
534
            id(action): self.__make_action(action, status)
535
            for (action, status) in report.get_actions_and_statuses()
536
        }
537
538
        self.__ordinates = {}
539
540
        dependents = {}
541
        for (action, _) in report.get_actions_and_statuses():
542
            dependents.setdefault(action, set())
543
            for dependency in action.dependencies:
544
                dependents.setdefault(dependency, set()).add(action)
545
546
        def compute(action, ordinate):
547
            self.__ordinates[id(action)] = len(self.__actions) - ordinate
548
            for d in sorted(
549
                action.dependencies,
550
                key=lambda d: report.get_action_status(d).success_time or report.get_action_status(d).failure_time
551
            ):
552
                if len(dependents[d]) == 1:
553
                    ordinate = compute(d, ordinate - 1)
554
                else:
555
                    dependents[d].remove(action)
556
            return ordinate
557
558
        last_ordinate = compute(report._root_action, len(self.__actions) - 1)
559
        assert last_ordinate == 0, last_ordinate
560
561
    class SuccessfulAction(object):
562
        def __init__(self, action, status):
563
            self.__label = str(action.label)
564
            self.__id = id(action)
565
            self.__dependencies = set(id(d) for d in action.dependencies)
566
            self.__ready_time = status.ready_time
567
            self.__start_time = status.start_time
568
            self.__success_time = status.success_time
569
570
        @property
571
        def min_time(self):
572
            return self.__ready_time
573
574
        @property
575
        def max_time(self):
576
            return self.__success_time
577
578
        def draw(self, ax, ordinates, actions):
579
            ordinate = ordinates[self.__id]
580
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="blue", lw=1)
581
            ax.plot(
582
                [self.__start_time, self.__success_time], [ordinate, ordinate],
583
                color="blue", lw=4, solid_capstyle="butt",
584
            )
585
            # @todo Make sure the text is not outside the plot on the right
586
            ax.annotate(
587
                self.__label,
588
                xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points",
589
            )
590
            for d in self.__dependencies:
591
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
592
593
    class FailedAction(object):
594
        def __init__(self, action, status):
595
            self.__label = str(action.label)
596
            self.__id = id(action)
597
            self.__dependencies = set(id(d) for d in action.dependencies)
598
            self.__ready_time = status.ready_time
599
            self.__start_time = status.start_time
600
            self.__failure_time = status.failure_time
601
602
        @property
603
        def min_time(self):
604
            return self.__ready_time
605
606
        @property
607
        def max_time(self):
608
            return self.__failure_time
609
610
        def draw(self, ax, ordinates, actions):
611
            ordinate = ordinates[self.__id]
612
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="red", lw=1)
613
            ax.plot(
614
                [self.__start_time, self.__failure_time], [ordinate, ordinate],
615
                color="red", lw=4, solid_capstyle="butt",
616
            )
617
            ax.annotate(
618
                self.__label,
619
                xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points",
620
            )
621
            for d in self.__dependencies:
622
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
623
624
    class CanceledAction(object):
625
        def __init__(self, action, status):
626
            self.__label = str(action.label)
627
            self.__id = id(action)
628
            self.__dependencies = set(id(d) for d in action.dependencies)
629
            self.__ready_time = status.ready_time
630
            self.__cancel_time = status.cancel_time
631
632
        @property
633
        def min_time(self):
634
            return self.__cancel_time if self.__ready_time is None else self.__ready_time
635
636
        @property
637
        def max_time(self):
638
            return self.__cancel_time
639
640
        def draw(self, ax, ordinates, actions):
641
            ordinate = ordinates[self.__id]
642
            if self.__ready_time:  # Not in user guide: implementation detail
643
                ax.plot([self.__ready_time, self.__cancel_time], [ordinate, ordinate], color="grey", lw=1)
644
            ax.annotate(
645
                self.__label,
646
                xy=(self.__cancel_time, ordinate), xytext=(0, 3), textcoords="offset points",
647
                color="grey",
648
            )
649
            for d in self.__dependencies:
650
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
651
652
    @classmethod
653
    def __make_action(cls, action, status):
654
        if status.status == SUCCESSFUL:
655
            return cls.SuccessfulAction(action, status)
656
        elif status.status == FAILED:
657
            return cls.FailedAction(action, status)
658
        elif status.status == CANCELED:
659
            return cls.CanceledAction(action, status)
660
661
    def write_to_png(self, filename):
662
        """
663
        Write the Gantt chart as a PNG image to the specified file.
664
665
        See also :meth:`get_mpl_figure` and :meth:`plot_on_mpl_axes` if you want to draw the report somewhere else.
666
        """
667
        figure = self.get_mpl_figure()
668
        canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(figure)
669
        canvas.print_figure(filename)
670
671
    def get_mpl_figure(self):
672
        """
673
        Return a :class:`matplotlib.figure.Figure` of this Gantt chart.
674
675
        See also :meth:`plot_on_mpl_axes` if you want to draw the Gantt chart on your own matplotlib figure.
676
677
        See also :meth:`write_to_png` for the simplest use-case.
678
        """
679
        fig = matplotlib.figure.Figure()
680
        ax = fig.add_subplot(1, 1, 1)
681
682
        self.plot_on_mpl_axes(ax)
683
684
        return fig
685
686
    @staticmethod
687
    def __nearest(v, values):  # Not in user guide: implementation detail
688
        for i, value in enumerate(values):
689
            if v < value:
690
                break
691
        if i == 0:
692
            return values[0]
693
        else:
694
            if v - values[i - 1] <= values[i] - v:
695
                return values[i - 1]
696
            else:
697
                return values[i]
698
699
    __intervals = [
700
        1, 2, 5, 10, 15, 30, 60,
701
        2 * 60, 10 * 60, 30 * 60, 3600,
702
        2 * 3600, 3 * 3600, 6 * 3600, 12 * 3600, 24 * 3600,
703
    ]
704
705
    def plot_on_mpl_axes(self, ax):
706
        """
707
        Plot this Gantt chart on the provided :class:`matplotlib.axes.Axes`.
708
709
        See also :meth:`write_to_png` and :meth:`get_mpl_figure` for the simpler use-cases.
710
        """
711
        for action in self.__actions.itervalues():
712
            action.draw(ax, self.__ordinates, self.__actions)
713
714
        ax.get_yaxis().set_ticklabels([])
715
        ax.set_ylim(0.5, len(self.__actions) + 1)
716
717
        min_time = min(a.min_time for a in self.__actions.itervalues()).replace(microsecond=0)
718
        max_time = (
719
            max(a.max_time for a in self.__actions.itervalues()).replace(microsecond=0) +
720
            datetime.timedelta(seconds=1)
721
        )
722
        duration = int((max_time - min_time).total_seconds())
723
724
        ax.set_xlabel("Local time")
725
        ax.set_xlim(min_time, max_time)
726
        ax.xaxis_date()
727
        ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%H:%M:%S"))
728
        ax.xaxis.set_major_locator(matplotlib.dates.AutoDateLocator(maxticks=4, minticks=5))
729
730
        ax2 = ax.twiny()
731
        ax2.set_xlabel("Relative time")
732
        ax2.set_xlim(min_time, max_time)
733
        ticks = range(0, duration, self.__nearest(duration // 5, self.__intervals))
734
        ax2.xaxis.set_ticks([min_time + datetime.timedelta(seconds=s) for s in ticks])
735
        ax2.xaxis.set_ticklabels(ticks)
736
737
738
class WurlitzerToEvents(wurlitzer.Wurlitzer):
739
    # This is a highly contrived use of Wurlitzer:
740
    # We just need to *capture* standards streams, so we trick Wurlitzer,
741
    # passing True instead of writeable file-like objects, and we redefine
742
    # _handle_xxx methods to intercept what it would write
743
    def __init__(self, events, action_id):
744
        super(WurlitzerToEvents, self).__init__(stdout=True, stderr=True)
745
        self.events = events
746
        self.action_id = action_id
747
748
    def _handle_stdout(self, data):
749
        self.events.put((PRINTED, self.action_id, (datetime.datetime.now(), self._decode(data))))
750
751
    def _handle_stderr(self, data):
752
        self._handle_stdout(data)
753
754
755
class _Execute(object):
756
    def __init__(self, cpu_cores, keep_going, do_raise, hooks):
757
        self.cpu_cores = cpu_cores
758
        self.keep_going = keep_going
759
        self.do_raise = do_raise
760
        self.hooks = hooks
761
762
    def run(self, root_action):
763
        now = datetime.datetime.now()
764
765
        # Pre-process actions
766
        self._check_picklability(root_action)
767
        actions = root_action.get_possible_execution_order()
768
        self.actions_by_id = {id(action): action for action in actions}
769
        self.dependents = {action: set() for action in actions}
770
        for action in actions:
771
            for dependency in action.dependencies:
772
                self.dependents[dependency].add(action)
773
774
        # Misc stuff
775
        self.report = ExecutionReport(root_action, actions, now)
776
        for action in actions:
777
            self.hooks.action_pending(now, action)
778
        self.events = multiprocessing.Queue()
779
        self.exceptions = []
780
        self.resources_used = {}
781
782
        # Actions by status
783
        self.pending = set(actions)
784
        self.ready = set()
785
        self.running = set()
786
        self.done = set()
787
        for action in actions:
788
            if not action.dependencies:
789
                self._prepare_action(action, now)
790
791
        # Execute
792
        while self.pending or self.ready or self.running:
793
            self._progress(now)
794
            now = datetime.datetime.now()
795
796
        for w in multiprocessing.active_children():
797
            w.join()
798
799
        if self.do_raise and self.exceptions:
800
            raise CompoundException(self.exceptions, self.report)
801
        else:
802
            return self.report
803
804
    def _cancel_action(self, action, now):
805
        self.report.get_action_status(action)._set_cancel_time(now)
806
        self.hooks.action_canceled(now, action)
807
808
        if action in self.pending:
809
            self._change_status(action, self.pending, self.done)
810
        else:  # Not in user guide: implementation detail
811
            self._change_status(action, self.ready, self.done)
812
813
        if not self.keep_going:
814
            for d in action.dependencies:
815
                if d in self.pending or d in self.ready:
816
                    self._cancel_action(d, now)
817
        self._triage_pending_dependents(action, True, now)
818
819
    def _triage_pending_dependents(self, action, failed, now):
820
        for dependent in self.pending & self.dependents[action]:
821
            if failed and not dependent.weak_dependencies:
822
                self._cancel_action(dependent, now)
823
            elif all(d in self.done for d in dependent.dependencies):
824
                self._prepare_action(dependent, now)
825
826
    def _prepare_action(self, action, now):
827
        self.report.get_action_status(action)._set_ready_time(now)
828
        self.hooks.action_ready(now, action)
829
830
        self._change_status(action, self.pending, self.ready)
831
832
    def _progress(self, now):
833
        # @todo Should we tweak the scheduling?
834
        # We could prioritize the actions that use many resources,
835
        # hoping that this would avoid idle CPU cores at the end of the execution.
836
        # Scheduling is a hard problem, we may just want to keep the current, random, behavior.
837
        for action in set(self.ready):
838
            if self._allocate_resources(action):
839
                self._start_action(action, now)
840
        self._handle_next_event()
841
842
    def _allocate_resources(self, action):
843
        for (resource, quantity) in action.resources_required:
844
            used = self.resources_used.setdefault(resource, 0)
845
            if used == 0:
846
                # Allow actions requiring more than available to run when they are alone requiring this resource
847
                continue
848
            availability = resource._availability(self.cpu_cores)
849
            if availability is UNLIMITED:  # Not in user guide: implementation detail
850
                # Don't check usage of unlimited resources
851
                continue
852
            if used + quantity > availability:
853
                return False
854
        for (resource, quantity) in action.resources_required:
855
            self.resources_used[resource] += quantity
856
        return True
857
858
    def _start_action(self, action, now):
859
        self.report.get_action_status(action)._set_start_time(now)
860
        self.hooks.action_started(now, action)
861
862
        dependency_statuses = {d: self.report.get_action_status(d) for d in action.dependencies}
863
        p = multiprocessing.Process(
864
            target=self._run_action,
865
            kwargs=dict(action=action, action_id=id(action), dependency_statuses=dependency_statuses)
866
        )
867
        p.start()
868
        self._change_status(action, self.ready, self.running)
869
870
    def _run_action(self, action, action_id, dependency_statuses):
871
        with WurlitzerToEvents(self.events, action_id):
872
            return_value = exception = None
873
            try:
874
                return_value = action.do_execute(dependency_statuses)
875
            except Exception as e:
876
                exception = e
877
        try:
878
            self._check_picklability((exception, return_value))
879
        except:  # Not in user guide: mandatory picklability is more an issue than a feature
880
            self.events.put((PICKLING_EXCEPTION, action_id, ()))
881
        else:
882
            end_time = datetime.datetime.now()
883
            if exception:
884
                self.events.put((FAILED, action_id, (end_time, exception)))
885
            else:
886
                self.events.put((SUCCESSFUL, action_id, (end_time, return_value)))
887
888
    def _check_picklability(self, stuff):
889
        # This is a way to fail fast if we see a non-picklable object
890
        # because ProcessPoolExecutor freezes forever if we try to transfer
891
        # a non-picklable object through its queues
892
        pickle.loads(pickle.dumps(stuff))
893
894
    def _handle_next_event(self):
895
        (event_kind, action_id, event_payload) = self.events.get()
896
        handlers = {
897
            SUCCESSFUL: self._handle_successful_event,
898
            PRINTED: self._handle_printed_event,
899
            FAILED: self._handle_failed_event,
900
            PICKLING_EXCEPTION: self._handle_pickling_exception_event,
901
        }
902
        handlers[event_kind](self.actions_by_id[action_id], *event_payload)
903
904
    def _handle_successful_event(self, action, success_time, return_value):
905
        self.report.get_action_status(action)._set_success(success_time, return_value)
906
        self.hooks.action_successful(success_time, action, return_value)
907
908
        self._change_status(action, self.running, self.done)
909
        self._triage_pending_dependents(action, False, success_time)
910
        self._deallocate_resources(action)
911
912
    def _handle_printed_event(self, action, print_time, text):
913
        self.report.get_action_status(action)._add_output(text)
914
        self.hooks.action_printed(print_time, action, text)
915
916
    def _handle_failed_event(self, action, failure_time, exception):
917
        self.report.get_action_status(action)._set_failure(failure_time, exception)
918
        self.hooks.action_failed(failure_time, action, exception)
919
920
        self._change_status(action, self.running, self.done)
921
        self.exceptions.append(exception)
922
        self._triage_pending_dependents(action, True, failure_time)
923
        self._deallocate_resources(action)
924
925
    def _handle_pickling_exception_event(self, action):  # Not in user guide: mandatory picklability
926
        raise pickle.PicklingError()
927
928
    def _change_status(self, action, orig, dest):
929
        orig.remove(action)
930
        dest.add(action)
931
932
    def _deallocate_resources(self, action):
933
        for (resource, quantity) in action.resources_required:
934
            self.resources_used[resource] = max(0, self.resources_used[resource] - quantity)
935