MultiThreadedExecutionTestCase   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 129
Duplicated Lines 43.41 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
dl 56
loc 129
rs 10
c 6
b 0
f 0
wmc 6

6 Methods

Rating   Name   Duplication   Size   Complexity  
B test_deep_dependencies() 0 28 1
A test_many_dependencies_with_default_cpu_cores() 18 18 1
A test_many_dependencies() 18 18 1
A test_two_deep_branches() 0 22 1
A test_diamond_dependencies() 20 20 1
A test_half_diamond_dependency() 0 17 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# coding: utf8
2
3
# Copyright 2013-2018 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
from ActionTree import *
8
from . import *
9
10
11
class MultiThreadedExecutionTestCase(ActionTreeTestCase):
12 View Code Duplication
    def test_many_dependencies(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
13
        #     a
14
        #    /|\
15
        #   / | \
16
        #  b  c  d
17
18
        a = self._action("a")
19
        barrier = self._barrier(3)
20
        b = self._action("b", barrier=barrier, end_event=True)
21
        c = self._action("c", barrier=barrier, end_event=True)
22
        d = self._action("d", barrier=barrier, end_event=True)
23
        a.add_dependency(b)
24
        a.add_dependency(c)
25
        a.add_dependency(d)
26
27
        execute(a, cpu_cores=3)
28
29
        self.assertEventsEqual("bcd BCD a")
30
31 View Code Duplication
    def test_many_dependencies_with_default_cpu_cores(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
32
        #     a
33
        #    /|\
34
        #   / | \
35
        #  b  c  d
36
37
        a = self._action("a")
38
        barrier = self._barrier(3)
39
        b = self._action("b", barrier=barrier, end_event=True)
40
        c = self._action("c", barrier=barrier, end_event=True)
41
        d = self._action("d", barrier=barrier, end_event=True)
42
        a.add_dependency(b)
43
        a.add_dependency(c)
44
        a.add_dependency(d)
45
46
        execute(a, cpu_cores=None)
47
48
        self.assertEventsEqual("bcd BCD a")
49
50
    def test_deep_dependencies(self):
51
        #  a
52
        #  |
53
        #  b
54
        #  |
55
        #  c
56
        #  |
57
        #  d
58
        #  |
59
        #  e
60
        #  |
61
        #  f
62
63
        a = self._action("a")
64
        b = self._action("b", end_event=True)
65
        c = self._action("c", end_event=True)
66
        d = self._action("d", end_event=True)
67
        e = self._action("e", end_event=True)
68
        f = self._action("f", end_event=True)
69
        a.add_dependency(b)
70
        b.add_dependency(c)
71
        c.add_dependency(d)
72
        d.add_dependency(e)
73
        e.add_dependency(f)
74
75
        execute(a, cpu_cores=3)
76
77
        self.assertEventsEqual("f F e E d D c C b B a")
78
79 View Code Duplication
    def test_diamond_dependencies(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
80
        #     a
81
        #    / \
82
        #   b   c
83
        #    \ /
84
        #     d
85
86
        a = self._action("a")
87
        barrier = self._barrier(2)
88
        b = self._action("b", barrier=barrier, end_event=True)
89
        c = self._action("c", barrier=barrier, end_event=True)
90
        d = self._action("d", end_event=True)
91
        a.add_dependency(b)
92
        a.add_dependency(c)
93
        b.add_dependency(d)
94
        c.add_dependency(d)
95
96
        execute(a, cpu_cores=3)
97
98
        self.assertEventsEqual("d D bc BC a")
99
100
    def test_half_diamond_dependency(self):
101
        #     a
102
        #    /|
103
        #   b |
104
        #    \|
105
        #     d
106
107
        a = self._action("a")
108
        b = self._action("b", end_event=True)
109
        d = self._action("d", end_event=True)
110
        a.add_dependency(b)
111
        a.add_dependency(d)
112
        b.add_dependency(d)
113
114
        execute(a, cpu_cores=3)
115
116
        self.assertEventsEqual("d D b B a")
117
118
    def test_two_deep_branches(self):
119
        #     a
120
        #    / \
121
        #   b   c
122
        #   |   |
123
        #   d   e
124
125
        a = self._action("a")
126
        barrier1 = self._barrier(2)
127
        b = self._action("b", barrier=barrier1, end_event=True)
128
        c = self._action("c", barrier=barrier1, end_event=True)
129
        barrier2 = self._barrier(2)
130
        d = self._action("d", barrier=barrier2, end_event=True)
131
        e = self._action("e", barrier=barrier2, end_event=True)
132
        a.add_dependency(b)
133
        a.add_dependency(c)
134
        b.add_dependency(d)
135
        c.add_dependency(e)
136
137
        execute(a, cpu_cores=3)
138
139
        self.assertEventsEqual("de DEbc BC a")
140