Completed
Push — master ( 2ae69e...39840a )
by Vincent
01:14
created

ExceptionsHandlingTestCase   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 113
Duplicated Lines 13.27 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
wmc 15
c 5
b 0
f 0
dl 15
loc 113
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
B test_exceptions_in_long_branch_dependencies_with_keep_going() 0 29 2
A test_exception_in_dependency() 15 15 2
A test_simple_failure_without_raise() 0 8 1
B test_exceptions_in_dependencies_without_keep_going() 0 23 5
A test_exceptions_in_dependencies_with_keep_going() 0 20 3
A test_simple_failure() 0 12 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# coding: utf8
2
3
# Copyright 2013-2015 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
from ActionTree import *
8
from . import *
9
10
11
class ExceptionsHandlingTestCase(ActionTreeTestCase):
12
    def test_simple_failure(self):
13
        a = self._action("a", exception=Exception("foobar"))
14
15
        with self.assertRaises(CompoundException) as catcher:
16
            execute(a, jobs=1)
17
        report = catcher.exception.execution_report
18
19
        self.assertEqual(len(catcher.exception.exceptions), 1)
20
        self.assertEqual(catcher.exception.exceptions[0].args, ("foobar",))
21
22
        self.assertFalse(report.is_success)
23
        self.assertEqual(report.get_action_status(a).status, ExecutionReport.ActionStatus.Failed)
24
25
    def test_simple_failure_without_raise(self):
26
        a = self._action("a", exception=Exception("foobar"))
27
28
        report = execute(a, jobs=1, do_raise=False)
29
30
        self.assertFalse(report.is_success)
31
        self.assertEqual(report.get_action_status(a).status, ExecutionReport.ActionStatus.Failed)
32
        self.assertEqual(report.get_action_status(a).exception.args, ("foobar",))
33
34 View Code Duplication
    def test_exception_in_dependency(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
35
        a = self._action("a")
36
        b = self._action("b", exception=Exception("foobar"))
37
        a.add_dependency(b)
38
39
        with self.assertRaises(CompoundException) as catcher:
40
            execute(a, jobs=1)
41
        report = catcher.exception.execution_report
42
43
        self.assertEqual(len(catcher.exception.exceptions), 1)
44
        self.assertEqual(catcher.exception.exceptions[0].args, ("foobar",))
45
46
        self.assertFalse(report.is_success)
47
        self.assertEqual(report.get_action_status(a).status, ExecutionReport.ActionStatus.Canceled)
48
        self.assertEqual(report.get_action_status(b).status, ExecutionReport.ActionStatus.Failed)
49
50
    def test_exceptions_in_dependencies_with_keep_going(self):
51
        a = self._action("a")
52
        b = self._action("b", exception=Exception("eb"))
53
        c = self._action("c", exception=Exception("ec"))
54
        d = self._action("d")
55
        a.add_dependency(b)
56
        a.add_dependency(c)
57
        a.add_dependency(d)
58
59
        with self.assertRaises(CompoundException) as catcher:
60
            execute(a, jobs=1, keep_going=True)
61
        report = catcher.exception.execution_report
62
63
        self.assertEqual(len(catcher.exception.exceptions), 2)
64
        self.assertEqual(sorted(ex.args for ex in catcher.exception.exceptions), [("eb",), ("ec",)])
65
66
        self.assertEqual(report.get_action_status(a).status, ExecutionReport.ActionStatus.Canceled)
67
        self.assertEqual(report.get_action_status(b).status, ExecutionReport.ActionStatus.Failed)
68
        self.assertEqual(report.get_action_status(c).status, ExecutionReport.ActionStatus.Failed)
69
        self.assertEqual(report.get_action_status(d).status, ExecutionReport.ActionStatus.Successful)
70
71
    def test_exceptions_in_long_branch_dependencies_with_keep_going(self):
72
        a = self._action("a")
73
        b = self._action("b")
74
        c = self._action("c")
75
        d = self._action("d")
76
        e = self._action("e", exception=Exception("foobar"))
77
        f = self._action("f")
78
        g = self._action("g")
79
        a.add_dependency(b)
80
        b.add_dependency(c)
81
        a.add_dependency(d)
82
        d.add_dependency(e)
83
        a.add_dependency(f)
84
        f.add_dependency(g)
85
86
        with self.assertRaises(CompoundException) as catcher:
87
            execute(a, jobs=1, keep_going=True)
88
        report = catcher.exception.execution_report
89
90
        self.assertEqual(len(catcher.exception.exceptions), 1)
91
        self.assertEqual(catcher.exception.exceptions[0].args, ("foobar",))
92
93
        self.assertEqual(report.get_action_status(a).status, ExecutionReport.ActionStatus.Canceled)
94
        self.assertEqual(report.get_action_status(b).status, ExecutionReport.ActionStatus.Successful)
95
        self.assertEqual(report.get_action_status(c).status, ExecutionReport.ActionStatus.Successful)
96
        self.assertEqual(report.get_action_status(d).status, ExecutionReport.ActionStatus.Canceled)
97
        self.assertEqual(report.get_action_status(e).status, ExecutionReport.ActionStatus.Failed)
98
        self.assertEqual(report.get_action_status(f).status, ExecutionReport.ActionStatus.Successful)
99
        self.assertEqual(report.get_action_status(g).status, ExecutionReport.ActionStatus.Successful)
100
101
    def test_exceptions_in_dependencies_without_keep_going(self):
102
        some_dependency_was_submitted_then_canceled_at_least_once = False
103
        for i in range(10):
104
            a = self._action("a")
105
            d0 = self._action("0", exception=Exception())
106
            a.add_dependency(d0)
107
            deps = [self._action(str(i)) for i in range(10)]
108
            for dep in deps:
109
                a.add_dependency(dep)
110
111
            report = execute(a, jobs=1, keep_going=False, do_raise=False)
112
113
            self.assertEqual(report.get_action_status(a).status, ExecutionReport.ActionStatus.Canceled)
114
            self.assertEqual(report.get_action_status(d0).status, ExecutionReport.ActionStatus.Failed)
115
116
            some_dependency_was_submitted_then_canceled_this_time = any(
117
                report.get_action_status(dep).status == ExecutionReport.ActionStatus.Canceled
118
                for dep in deps
119
            )
120
            some_dependency_was_submitted_then_canceled_at_least_once |= (
121
                some_dependency_was_submitted_then_canceled_this_time
122
            )
123
        self.assertTrue(some_dependency_was_submitted_then_canceled_at_least_once)
124