Completed
Push — master ( 5a7b91...f6113a )
by Vincent
01:20
created

  A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 118
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 19
dl 0
loc 118
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A xecutionReport.is_success() 0 8 1
A xecutionReport.__init__() 0 3 1
A xecutionReport.set_success() 0 2 1
A ctionStatus.cancel_time() 0 6 1
F ctionStatus.__init__() 0 24 10
A xecutionReport.get_actions_and_statuses() 0 5 1
A xecutionReport.get_action_status() 0 5 1
A ctionStatus.status() 0 6 1
A ctionStatus.ready_time() 0 6 1
A xecutionReport.set_action_status() 0 2 1
A ctionStatus.start_time() 0 6 1
A ctionStatus.success_time() 0 6 1
A ctionStatus.failure_time() 0 6 1
1
# coding: utf8
2
3
# Copyright 2013-2015 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
import concurrent.futures as futures
8
import datetime
9
import multiprocessing
10
import os.path
11
12
import graphviz
13
import matplotlib
14
import matplotlib.dates
15
import matplotlib.figure
16
import matplotlib.backends.backend_agg
17
18
19
def execute(action, jobs=1, keep_going=False):
20
    """
21
    Recursively execute an action's dependencies then the action.
22
23
    :param Action action: the action to execute.
24
    :param int jobs: number of actions to execute in parallel. Pass ``None`` to let ActionTree choose.
25
    :param bool keep_going: if ``True``, then execution does not stop on first failure,
26
        but executes as many dependencies as possible.
27
28
    :raises CompoundException: when dependencies raise exceptions.
29
30
    :rtype: ExecutionReport
31
    """
32
    if jobs <= 0 or jobs is None:
33
        jobs = multiprocessing.cpu_count() + 1
34
    return Executor(jobs, keep_going).execute(action)
35
36
37
class ExecutionReport(object):
38
    """
39
    ExecutionReport()
40
41
    Execution report, returned by :func:`.execute`.
42
    """
43
44
    class ActionStatus(object):
45
        """
46
        ActionStatus()
47
48
        Status of a single :class:`.Action`.
49
        """
50
51
        Successful = "Successful"
52
        "The :attr:`status` after a successful execution."
53
        Canceled = "Canceled"
54
        "The :attr:`status` after a failed execution where a dependency raised an exception."
55
        Failed = "Failed"
56
        "The :attr:`status` after a failed execution where this action raised an exception."
57
58
        def __init__(self, ready_time=None, start_time=None, cancel_time=None, failure_time=None, success_time=None):
59
            ready = bool(ready_time)
60
            start = bool(start_time)
61
            cancel = bool(cancel_time)
62
            failure = bool(failure_time)
63
            success = bool(success_time)
64
            if ready:
65
                if start:
66
                    assert not cancel
67
                    assert (failure or success)
68
                    self.__status = self.Successful if success else self.Failed
69
                else:
70
                    assert cancel
71
                    assert not (failure or success)
72
                    self.__status = self.Canceled
73
            else:
74
                assert cancel
75
                assert not (start or failure or success)
76
                self.__status = self.Canceled
77
            self.__ready_time = ready_time
78
            self.__cancel_time = cancel_time
79
            self.__start_time = start_time
80
            self.__success_time = success_time
81
            self.__failure_time = failure_time
82
83
        @property
84
        def status(self):
85
            """
86
            @todo Document
87
            """
88
            return self.__status
89
90
        @property
91
        def ready_time(self):
92
            """
93
            The local :class:`~datetime.datetime` when this action was ready to execute.
94
            """
95
            return self.__ready_time
96
97
        @property
98
        def start_time(self):
99
            """
100
            The local :class:`~datetime.datetime` at the begining of the execution of this action.
101
            """
102
            return self.__start_time
103
104
        @property
105
        def success_time(self):
106
            """
107
            The local :class:`~datetime.datetime` at the successful end of the execution of this action.
108
            """
109
            return self.__success_time
110
111
        @property
112
        def failure_time(self):
113
            """
114
            The local :class:`~datetime.datetime` at the successful end of the execution of this action.
115
            """
116
            return self.__failure_time
117
118
        @property
119
        def cancel_time(self):
120
            """
121
            The local :class:`~datetime.datetime` when this action was canceled.
122
            """
123
            return self.__cancel_time
124
125
    def __init__(self):
126
        self.__is_success = True
127
        self.__action_statuses = dict()
128
129
    def set_success(self, is_success):
130
        self.__is_success = is_success
131
132
    @property
133
    def is_success(self):
134
        """
135
        ``True`` if the execution finished without error.
136
137
        :rtype: bool
138
        """
139
        return self.__is_success
140
141
    def set_action_status(self, action, status):
142
        self.__action_statuses[action] = status
143
144
    def get_action_status(self, action):
145
        """
146
        @todo Document
147
        """
148
        return self.__action_statuses[action]
149
150
    def get_actions_and_statuses(self):
151
        """
152
        @todo Document
153
        """
154
        return self.__action_statuses.items()
155
156
157
class Executor(object):
158
    def __init__(self, jobs, keep_going):
159
        self.__jobs = jobs
160
        self.__keep_going = keep_going
161
162
    class Execution:
163
        # An aggregate for things that cannot be stored in Executor
164
        # (to allow several parallel calls to Executor.execute)
165
        def __init__(self, executor, pending):
166
            self.executor = executor
167
            self.pending = set(pending)  # Action
168
            self.submitted = dict()  # Future -> Action
169
            self.submitted_at = dict()  # Action -> datetime.datetime
170
            self.succeeded = set()  # Action
171
            self.failed = set()  # Action
172
            self.exceptions = []
173
            self.report = ExecutionReport()
174
175
    def execute(self, action):
176
        # Threads in pool just call self.__time_execute, which has no side effects.
177
        # To avoid races, only the thread calling Executor.execute is allowed to modify anything.
178
179
        with futures.ThreadPoolExecutor(max_workers=self.__jobs) as executor:
180
            execution = Executor.Execution(executor, action.get_all_dependencies())
181
            while execution.pending or execution.submitted:
182
                self.__progress(execution)
183
184
        if execution.exceptions:
185
            raise CompoundException(execution.exceptions, execution.report)
186
        else:
187
            return execution.report
188
189
    def __progress(self, execution):
190
        now = datetime.datetime.now()
191
        if self.__keep_going or not execution.exceptions:
192
            self.__submit_or_cancel(execution, now)
193
        else:
194
            self.__cancel(execution, now)
195
        self.__wait(execution)
196
197
    def __submit_or_cancel(self, execution, now):
198
        go_on = True
199
        while go_on:
200
            go_on = False
201
            for action in set(execution.pending):
202
                done = execution.succeeded | execution.failed
203
                if all(d in done for d in action.dependencies):
204
                    if any(d in execution.failed for d in action.dependencies):
205
                        self.__mark_action_canceled(execution, action, now)
206
                        execution.pending.remove(action)
207
                        go_on = True
208
                    else:
209
                        execution.submitted[execution.executor.submit(self.__time_execute, action)] = action
210
                        execution.submitted_at[action] = now
211
                        execution.pending.remove(action)
212
                        go_on = True
213
214
    @staticmethod
215
    def __time_execute(action):
216
        exception = None
217
        try:
218
            begin_time = datetime.datetime.now()
219
            action.do_execute()
220
        except Exception as e:
221
            exception = e
222
        end_time = datetime.datetime.now()
223
        return (exception, begin_time, end_time)
224
225
    def __cancel(self, execution, now):
226
        for (f, action) in execution.submitted.items():
227
            if f.cancel():
228
                self.__mark_action_canceled(execution, action, now)
229
                del execution.submitted[f]
230
        for action in execution.pending:
231
            self.__mark_action_canceled(execution, action, now)
232
        execution.pending.clear()
233
234
    def __wait(self, execution):
235
        waited = futures.wait(execution.submitted.keys(), return_when=futures.FIRST_COMPLETED)
236
        for f in waited.done:
237
            action = execution.submitted[f]
238
            del execution.submitted[f]
239
            (exception, begin_time, end_time) = f.result()
240
            if exception:
241
                self.__mark_action_failed(execution, action, begin_time, end_time)
242
                execution.exceptions.append(exception)
243
                execution.report.set_success(False)
244
            else:
245
                self.__mark_action_successful(execution, action, begin_time, end_time)
246
247
    @staticmethod
248
    def __mark_action_canceled(execution, action, cancel_time):
249
        execution.failed.add(action)
250
        execution.report.set_action_status(
251
            action,
252
            ExecutionReport.ActionStatus(
253
                ready_time=execution.submitted_at.get(action),
254
                cancel_time=cancel_time,
255
            )
256
        )
257
258
    @staticmethod
259
    def __mark_action_successful(execution, action, start_time, success_time):
260
        execution.succeeded.add(action)
261
        execution.report.set_action_status(
262
            action,
263
            ExecutionReport.ActionStatus(
264
                ready_time=execution.submitted_at[action],
265
                start_time=start_time,
266
                success_time=success_time,
267
            ),
268
        )
269
270
    @staticmethod
271
    def __mark_action_failed(execution, action, start_time, failure_time):
272
        execution.failed.add(action)
273
        execution.report.set_action_status(
274
            action,
275
            ExecutionReport.ActionStatus(
276
                ready_time=execution.submitted_at[action],
277
                start_time=start_time,
278
                failure_time=failure_time,
279
            )
280
        )
281
282
283
class Action(object):
284
    """
285
    The main class of ActionTree.
286
    An action to be started after all its dependencies are finished.
287
    Pass it to :func:`.execute`.
288
289
    This is a base class for your custom actions.
290
    You must define a ``def do_execute(self):`` method that performs the action.
291
    Its return value is ignored.
292
    If it raises and exception, it is captured and re-raised in a :exc:`CompoundException`.
293
294
    See also :class:`.ActionFromCallable` if you just want to create an action from a simple callable.
295
    """
296
    # @todo Add a note about printing anything in do_execute
297
    # @todo Add a note saying
298
299
    def __init__(self, label):
300
        """
301
        :param label: whatever you want to attach to the action.
302
            Can be retrieved by :attr:`label` and :meth:`get_preview`.
303
        """
304
        self.__label = label
305
        self.__dependencies = set()
306
307
    @property
308
    def label(self):
309
        """
310
        The label passed to the constructor.
311
        """
312
        return self.__label
313
314
    def add_dependency(self, dependency):
315
        """
316
        Add a dependency to be executed before this action.
317
        Order of insertion of dependencies is not important.
318
319
        :param Action dependency:
320
321
        :raises DependencyCycleException: when adding the new dependency would create a cycle.
322
        """
323
        if self in dependency.get_all_dependencies():
324
            raise DependencyCycleException()
325
        self.__dependencies.add(dependency)
326
327
    @property
328
    def dependencies(self):
329
        """
330
        Return the list of this action's dependencies.
331
        """
332
        return list(self.__dependencies)
333
334
    def get_all_dependencies(self):
335
        """
336
        Return the set of this action's recursive dependencies, including itself.
337
        """
338
        dependencies = set([self])
339
        for dependency in self.__dependencies:
340
            dependencies |= dependency.get_all_dependencies()
341
        return dependencies
342
343
    def get_preview(self):
344
        """
345
        Return the labels of this action and its dependencies, in an order that could be the execution order.
346
        """
347
        return [action.__label for action in self.get_possible_execution_order()]
348
349
    def get_possible_execution_order(self, seen_actions=None):
350
        if seen_actions is None:
351
            seen_actions = set()
352
        actions = []
353
        if self not in seen_actions:
354
            seen_actions.add(self)
355
            for dependency in self.__dependencies:
356
                actions += dependency.get_possible_execution_order(seen_actions)
357
            actions.append(self)
358
        return actions
359
360
361
class ActionFromCallable(Action):
362
    """
363
    An :class:`.Action` sub-class for the simple use-case of using a plain callable as an action.
364
    """
365
366
    def __init__(self, do_execute, label):
367
        """
368
        :param label: see :class:`.Action`.
369
        :param callable do_execute: the function to execute the action.
370
        """
371
        super(ActionFromCallable, self).__init__(label)
372
        self.__do_execute = do_execute
373
374
    def do_execute(self):
375
        self.__do_execute()
376
377
378
class CompoundException(Exception):
379
    """
380
    Exception thrown by :func:`.execute` when a dependencies raise exceptions.
381
    """
382
383
    def __init__(self, exceptions, execution_report):
384
        super(CompoundException, self).__init__(exceptions)
385
        self.__exceptions = exceptions
386
        self.__execution_report = execution_report
387
388
    @property
389
    def exceptions(self):
390
        """
391
        The list of the encapsulated exceptions.
392
        """
393
        return self.__exceptions
394
395
    @property
396
    def execution_report(self):
397
        """
398
        The :class:`.ExecutionReport` of the failed execution.
399
        """
400
        return self.__execution_report
401
402
403
class DependencyCycleException(Exception):
404
    """
405
    Exception thrown by :meth:`.Action.add_dependency` when adding the new dependency would create a cycle.
406
    """
407
408
    def __init__(self):
409
        super(DependencyCycleException, self).__init__("Dependency cycle")
410
411
412
class DependencyGraph(object):
413
    """
414
    @todo Document
415
    """
416
417
    def __init__(self, action):
418
        self.__graphviz_graph = graphviz.Digraph("action", node_attr={"shape": "box"})
419
        nodes = {}
420
        for (i, action) in enumerate(action.get_possible_execution_order()):
421
            node = str(i)
422
            nodes[action] = node
423
            self.__graphviz_graph.node(node, str(action.label))
424
            for dependency in action.dependencies:
425
                assert dependency in nodes  # Because we are iterating a possible execution order
426
                self.__graphviz_graph.edge(node, nodes[dependency])
427
428
    def write_to_png(self, filename):  # pragma no cover (Untestable? But small.)
429
        """
430
        Write the graph as a PNG image to the specified file.
431
432
        See also :meth:`get_graphviz_graph` if you want to draw the graph somewhere else.
433
        """
434
        directory = os.path.dirname(filename)
435
        filename = os.path.basename(filename)
436
        filename, ext = os.path.splitext(filename)
437
        g = self.get_graphviz_graph()
438
        g.format = "png"
439
        g.render(directory=directory, filename=filename, cleanup=True)
440
441
    def get_graphviz_graph(self):
442
        """
443
        Return a :class:`graphviz.Digraph` of this dependency graph.
444
445
        See also :meth:`write_to_png` for the simplest use-case.
446
        """
447
        return self.__graphviz_graph.copy()
448
449
450
class GanttChart(object):  # pragma no cover (Too difficult to unit test)
451
    """
452
    @todo Document
453
    """
454
455
    def __init__(self, report):
456
        # @todo Sort actions somehow to improve readability (top-left to bottom-right)
457
        self.__actions = {
458
            id(action): self.__make_action(action, status)
459
            for (action, status) in report.get_actions_and_statuses()
460
        }
461
462
    # @todo Factorize Actions
463 View Code Duplication
    class SuccessfulAction(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
464
        def __init__(self, action, status):
465
            self.__label = str(action.label)
466
            self.__id = id(action)
467
            self.__dependencies = set(id(d) for d in action.dependencies)
468
            self.__ready_time = status.ready_time
469
            self.__start_time = status.start_time
470
            self.__success_time = status.success_time
471
472
        @property
473
        def min_time(self):
474
            return self.__ready_time
475
476
        @property
477
        def max_time(self):
478
            return self.__success_time
479
480
        def draw(self, ax, ordinates, actions):
481
            ordinate = ordinates[self.__id]
482
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="blue", lw=1)
483
            # @todo Use an other end-style to avoid pixels before/after min/max_time
484
            ax.plot([self.__start_time, self.__success_time], [ordinate, ordinate], color="blue", lw=4)
485
            # @todo Make sure the text is not outside the plot on the right
486
            ax.annotate(self.__label, xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points")
487
            for d in self.__dependencies:
488
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
489
490 View Code Duplication
    class FailedAction(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
491
        def __init__(self, action, status):
492
            self.__label = str(action.label)
493
            self.__id = id(action)
494
            self.__dependencies = set(id(d) for d in action.dependencies)
495
            self.__ready_time = status.ready_time
496
            self.__start_time = status.start_time
497
            self.__failure_time = status.failure_time
498
499
        @property
500
        def min_time(self):
501
            return self.__ready_time
502
503
        @property
504
        def max_time(self):
505
            return self.__failure_time
506
507
        def draw(self, ax, ordinates, actions):
508
            ordinate = ordinates[self.__id]
509
            ax.plot([self.__ready_time, self.__start_time], [ordinate, ordinate], color="red", lw=1)
510
            ax.plot([self.__start_time, self.__failure_time], [ordinate, ordinate], color="red", lw=4)
511
            ax.annotate(self.__label, xy=(self.__start_time, ordinate), xytext=(0, 3), textcoords="offset points")
512
            for d in self.__dependencies:
513
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
514
515
    class CanceledAction(object):
516
        def __init__(self, action, status):
517
            self.__label = str(action.label)
518
            self.__id = id(action)
519
            self.__dependencies = set(id(d) for d in action.dependencies)
520
            self.__ready_time = status.ready_time
521
            self.__cancel_time = status.cancel_time
522
523
        @property
524
        def min_time(self):
525
            return self.__cancel_time if self.__ready_time is None else self.__ready_time
526
527
        @property
528
        def max_time(self):
529
            return self.__cancel_time
530
531
        def draw(self, ax, ordinates, actions):
532
            ordinate = ordinates[self.__id]
533
            if self.__ready_time:
534
                ax.plot([self.__ready_time, self.__cancel_time], [ordinate, ordinate], color="grey", lw=1)
535
            ax.annotate(
536
                "(Canceled) {}".format(self.__label),
537
                xy=(self.__cancel_time, ordinate),
538
                xytext=(0, 3),
539
                textcoords="offset points"
540
            )
541
            for d in self.__dependencies:
542
                ax.plot([actions[d].max_time, self.min_time], [ordinates[d], ordinate], "k:", lw=1)
543
544
    @classmethod
545
    def __make_action(cls, action, status):
546
        if status.status == ExecutionReport.ActionStatus.Successful:
547
            return cls.SuccessfulAction(action, status)
548
        elif status.status == ExecutionReport.ActionStatus.Failed:
549
            return cls.FailedAction(action, status)
550
        elif status.status == ExecutionReport.ActionStatus.Canceled:
551
            return cls.CanceledAction(action, status)
552
553
    def write_to_png(self, filename):
554
        """
555
        Write the Gantt chart as a PNG image to the specified file.
556
557
        See also :meth:`get_mpl_figure` and :meth:`plot_on_mpl_axes` if you want to draw the report somewhere else.
558
        """
559
        figure = self.get_mpl_figure()
560
        canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(figure)
561
        canvas.print_figure(filename)
562
563
    def get_mpl_figure(self):
564
        """
565
        Return a :class:`matplotlib.figure.Figure` of this Gantt chart.
566
567
        See also :meth:`plot_on_mpl_axes` if you want to draw the Gantt chart on your own matplotlib figure.
568
569
        See also :meth:`write_to_png` for the simplest use-case.
570
        """
571
        fig = matplotlib.figure.Figure()
572
        ax = fig.add_subplot(1, 1, 1)
573
574
        self.plot_on_mpl_axes(ax)
575
576
        return fig
577
578
    @staticmethod
579
    def __nearest(v, values):
580
        for i, value in enumerate(values):
581
            if v < value:
582
                break
583
        if i == 0:
584
            return values[0]
585
        else:
586
            if v - values[i - 1] <= values[i] - v:
587
                return values[i - 1]
588
            else:
589
                return values[i]
590
591
    __intervals = [
592
        1, 2, 5, 10, 15, 30, 60,
593
        2 * 60, 10 * 60, 30 * 60, 3600,
594
        2 * 3600, 3 * 3600, 6 * 3600, 12 * 3600, 24 * 3600,
595
    ]
596
597
    def plot_on_mpl_axes(self, ax):
598
        """
599
        Plot this Gantt chart on the provided :class:`matplotlib.axes.Axes`.
600
601
        See also :meth:`write_to_png` and :meth:`get_mpl_figure` for the simpler use-cases.
602
        """
603
        ordinates = {ident: len(self.__actions) - i for (i, ident) in enumerate(self.__actions.iterkeys())}
604
605
        for action in self.__actions.itervalues():
606
            action.draw(ax, ordinates, self.__actions)
607
608
        ax.get_yaxis().set_ticklabels([])
609
        ax.set_ylim(0.5, len(self.__actions) + 1)
610
611
        min_time = min(a.min_time for a in self.__actions.itervalues()).replace(microsecond=0)
612
        max_time = (
613
            max(a.max_time for a in self.__actions.itervalues()).replace(microsecond=0) +
614
            datetime.timedelta(seconds=1)
615
        )
616
        duration = int((max_time - min_time).total_seconds())
617
618
        ax.set_xlabel("Local time")
619
        ax.set_xlim(min_time, max_time)
620
        ax.xaxis_date()
621
        ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%H:%M:%S"))
622
        ax.xaxis.set_major_locator(matplotlib.dates.AutoDateLocator(maxticks=4, minticks=5))
623
624
        ax2 = ax.twiny()
625
        ax2.set_xlabel("Relative time")
626
        ax2.set_xlim(min_time, max_time)
627
        ticks = range(0, duration, self.__nearest(duration // 5, self.__intervals))
628
        ax2.xaxis.set_ticks([min_time + datetime.timedelta(seconds=s) for s in ticks])
629
        ax2.xaxis.set_ticklabels(ticks)
630