Test Failed
Pull Request — master (#70)
by macartur
02:36 queued 15s
created

build.tests.models.test_evc_deploy   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 373
Duplicated Lines 90.08 %

Importance

Changes 0
Metric Value
eloc 273
dl 336
loc 373
rs 10
c 0
b 0
f 0
wmc 13

13 Methods

Rating   Name   Duplication   Size   Complexity  
A TestEVC.test_primary_links_zipped() 0 3 1
A TestEVC.test_deploy_fail() 39 39 1
A TestEVC.test_prepare_pop_flow() 22 22 1
A TestEVC.test_should_deploy_case3() 14 14 1
A TestEVC.test_deploy_successfully() 39 39 1
A TestEVC.test_should_deploy_case4() 14 14 1
B TestEVC.test_install_uni_flows() 76 76 1
A TestEVC.test_send_flow_mods() 11 11 1
A TestEVC.test_should_deploy_case1() 14 14 1
A TestEVC.test_should_deploy_case2() 14 14 1
A TestEVC.test_install_nni_flows() 51 51 1
A TestEVC.test_prepare_push_flow() 29 29 1
A TestEVC.test_prepare_flow_mod() 13 13 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Method to thest EVCDeploy class."""
2
import sys
3
from unittest import TestCase
4
from unittest.mock import Mock, patch
5
6
from kytos.core.interface import UNI, Interface
7
from kytos.core.switch import Switch
8
from kytos.core.common import EntityStatus
9
10
# pylint: disable=wrong-import-position
11
sys.path.insert(0, '/var/lib/kytos/napps/..')
12
# pylint: enable=wrong-import-position
13
14
from napps.kytos.mef_eline.models import EVC, Path  # NOQA
15
from napps.kytos.mef_eline.settings import MANAGER_URL  # NOQA
16
from napps.kytos.mef_eline.tests.helpers import get_link_mocked, get_uni_mocked  # NOQA
17
18
19
class TestEVC(TestCase):  # pylint: disable=too-many-public-methods
20
    """Tests to verify EVC class."""
21
22
    def test_primary_links_zipped(self):
23
        """Test primary links zipped method."""
24
        pass
25
26 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
27
    @patch('napps.kytos.mef_eline.models.log')
28
    def test_should_deploy_case1(log_mock):
29
        """Test should deploy method without primary links."""
30
        log_mock.debug.return_value = True
31
        attributes = {
32
            "name": "custom_name",
33
            "uni_a": get_uni_mocked(is_valid=True),
34
            "uni_z": get_uni_mocked(is_valid=True)
35
        }
36
37
        evc = EVC(**attributes)
38
        evc.should_deploy()
39
        log_mock.debug.assert_called_with('Path is empty.')
40
41 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
42
    def test_should_deploy_case2(self, log_mock):
43
        """Test should deploy method with disable circuit."""
44
        log_mock.debug.return_value = True
45
        attributes = {
46
            "name": "custom_name",
47
            "uni_a": get_uni_mocked(is_valid=True),
48
            "uni_z": get_uni_mocked(is_valid=True),
49
            "primary_links": [get_link_mocked(), get_link_mocked()]
50
        }
51
        evc = EVC(**attributes)
52
53
        self.assertFalse(evc.should_deploy(attributes['primary_links']))
54
        log_mock.debug.assert_called_with(f'{evc} is disabled.')
55
56 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
57
    def test_should_deploy_case3(self, log_mock):
58
        """Test should deploy method with enabled and not active circuit."""
59
        log_mock.debug.return_value = True
60
        attributes = {
61
            "name": "custom_name",
62
            "uni_a": get_uni_mocked(is_valid=True),
63
            "uni_z": get_uni_mocked(is_valid=True),
64
            "primary_links": [get_link_mocked(), get_link_mocked()],
65
            "enabled": True
66
        }
67
        evc = EVC(**attributes)
68
        self.assertTrue(evc.should_deploy(attributes['primary_links']))
69
        log_mock.debug.assert_called_with(f'{evc} will be deployed.')
70
71 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
72
    def test_should_deploy_case4(self, log_mock):
73
        """Test should deploy method with enabled and active circuit."""
74
        log_mock.debug.return_value = True
75
        attributes = {
76
            "name": "custom_name",
77
            "uni_a": get_uni_mocked(is_valid=True),
78
            "uni_z": get_uni_mocked(is_valid=True),
79
            "primary_links": [get_link_mocked(), get_link_mocked()],
80
            "enabled": True,
81
            "active": True
82
        }
83
        evc = EVC(**attributes)
84
        self.assertFalse(evc.should_deploy(attributes['primary_links']))
85
86 View Code Duplication
    @patch('napps.kytos.mef_eline.models.requests')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
87
    def test_send_flow_mods(self, requests_mock):
88
        """Test if you are sending flow_mods."""
89
        flow_mods = {"id": 20}
90
        switch = Mock(spec=Switch, id=1)
91
        EVC.send_flow_mods(switch, flow_mods)
92
        expected_endpoint = f"{MANAGER_URL}/flows/{switch.id}"
93
        expected_data = {"flows": flow_mods}
94
        self.assertEqual(requests_mock.post.call_count, 1)
95
        requests_mock.post.assert_called_once_with(expected_endpoint,
96
                                                   json=expected_data)
97
98 View Code Duplication
    def test_prepare_flow_mod(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
99
        """Test prepare flow_mod method."""
100
        interface_a = Interface('eth0', 1, Mock(spec=Switch))
101
        interface_z = Interface('eth1', 3, Mock(spec=Switch))
102
        flow_mod = EVC.prepare_flow_mod(interface_a, interface_z)
103
        expected_flow_mod = {
104
                           'match': {'in_port': interface_a.port_number},
105
                           'actions': [
106
                                       {'action_type': 'output',
107
                                        'port': interface_z.port_number}
108
                           ]
109
        }
110
        self.assertEqual(expected_flow_mod, flow_mod)
111
112 View Code Duplication
    def test_prepare_pop_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
113
        """Test prepare pop flow  method."""
114
        attributes = {
115
            "name": "custom_name",
116
            "uni_a": get_uni_mocked(interface_port=1, is_valid=True),
117
            "uni_z": get_uni_mocked(interface_port=2, is_valid=True),
118
        }
119
        evc = EVC(**attributes)
120
        interface_a = evc.uni_a.interface
121
        interface_z = evc.uni_z.interface
122
        in_vlan = 10
123
        flow_mod = evc.prepare_pop_flow(interface_a, interface_z, in_vlan)
124
        expected_flow_mod = {
125
            'match': {'in_port': interface_a.port_number, 'dl_vlan': in_vlan},
126
            'actions': [
127
                        {'action_type': 'pop_vlan'},
128
                        {'action_type': 'output',
129
                         'port': interface_z.port_number
130
                         }
131
            ]
132
        }
133
        self.assertEqual(expected_flow_mod, flow_mod)
134
135 View Code Duplication
    def test_prepare_push_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
136
        """Test prepare push flow method."""
137
        attributes = {
138
            "name": "custom_name",
139
            "uni_a": get_uni_mocked(interface_port=1, is_valid=True),
140
            "uni_z": get_uni_mocked(interface_port=2, is_valid=True),
141
        }
142
        evc = EVC(**attributes)
143
        interface_a = evc.uni_a.interface
144
        interface_z = evc.uni_z.interface
145
        in_vlan_a = 10
146
        out_vlan_a = 20
147
        in_vlan_z = 3
148
        flow_mod = evc.prepare_push_flow(interface_a, interface_z,
149
                                         in_vlan_a, out_vlan_a, in_vlan_z)
150
        expected_flow_mod = {
151
            'match': {'in_port': interface_a.port_number,
152
                      'dl_vlan': in_vlan_a
153
                      },
154
            'actions': [
155
                        {'action_type': 'set_vlan', 'vlan_id': in_vlan_z},
156
                        {'action_type': 'push_vlan', 'tag_type': 's'},
157
                        {'action_type': 'set_vlan', 'vlan_id': out_vlan_a},
158
                        {'action_type': 'output',
159
                         'port': interface_z.port_number
160
                         }
161
            ]
162
        }
163
        self.assertEqual(expected_flow_mod, flow_mod)
164
165 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
166
    @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods')
167
    def test_install_uni_flows(send_flow_mods_mock):
168
        """Test install uni flows method.
169
170
        This test will verify the flows send to the send_flows_mods method.
171
        """
172
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
173
                               switch_id="switch_uni_a", is_valid=True)
174
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
175
                               switch_id="switch_uni_z", is_valid=True)
176
177
        attributes = {
178
            "name": "custom_name",
179
            "uni_a": uni_a,
180
            "uni_z": uni_z,
181
            "primary_links": [
182
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
183
                                metadata={"s_vlan": 5}),
184
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
185
                                metadata={"s_vlan": 6})
186
            ]
187
        }
188
        evc = EVC(**attributes)
189
        evc.install_uni_flows(attributes['primary_links'])
190
191
        expected_flow_mod_a = [
192
            {'match': {'in_port': uni_a.interface.port_number,
193
                       'dl_vlan': uni_a.user_tag.value},
194
             'actions': [
195
                {'action_type': 'set_vlan', 'vlan_id': uni_z.user_tag.value},
196
                {'action_type': 'push_vlan', 'tag_type': 's'},
197
                {'action_type': 'set_vlan',
198
                 'vlan_id': evc.primary_links[0].get_metadata('s_vlan').value},
199
                {'action_type': 'output',
200
                 'port': evc.primary_links[0].endpoint_a.port_number}
201
             ]},
202
            {'match': {
203
                'in_port': evc.primary_links[0].endpoint_a.port_number,
204
                'dl_vlan': evc.primary_links[0].get_metadata('s_vlan').value
205
             },
206
             'actions': [
207
                {'action_type': 'pop_vlan'},
208
                {'action_type': 'output', 'port': uni_a.interface.port_number}
209
             ]
210
             }
211
        ]
212
        send_flow_mods_mock.assert_any_call(uni_a.interface.switch,
213
                                            expected_flow_mod_a)
214
215
        expected_flow_mod_z = [
216
            {'match': {'in_port': uni_z.interface.port_number,
217
                       'dl_vlan': uni_z.user_tag.value},
218
             'actions': [
219
                {'action_type': 'set_vlan', 'vlan_id': uni_a.user_tag.value},
220
                {'action_type': 'push_vlan', 'tag_type': 's'},
221
                {'action_type': 'set_vlan',
222
                 'vlan_id': evc.primary_links[-1].get_metadata('s_vlan').value
223
                 },
224
                {'action_type': 'output',
225
                 'port': evc.primary_links[-1].endpoint_b.port_number}
226
              ]
227
             },
228
            {'match': {
229
                 'in_port': evc.primary_links[-1].endpoint_b.port_number,
230
                 'dl_vlan': evc.primary_links[-1].get_metadata('s_vlan').value
231
             },
232
             'actions': [
233
                {'action_type': 'pop_vlan'},
234
                {'action_type': 'output', 'port': uni_z.interface.port_number}
235
              ]
236
             }
237
        ]
238
239
        send_flow_mods_mock.assert_any_call(uni_z.interface.switch,
240
                                            expected_flow_mod_z)
241
242 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
243
    @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods')
244
    def test_install_nni_flows(send_flow_mods_mock):
245
        """Test install nni flows method.
246
247
        This test will verify the flows send to the send_flows_mods method.
248
        """
249
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
250
                               switch_id="switch_uni_a", is_valid=True)
251
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
252
                               switch_id="switch_uni_z", is_valid=True)
253
254
        attributes = {
255
            "name": "custom_name",
256
            "uni_a": uni_a,
257
            "uni_z": uni_z,
258
            "primary_links": [
259
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
260
                                metadata={"s_vlan": 5}),
261
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
262
                                metadata={"s_vlan": 6})
263
            ]
264
        }
265
        evc = EVC(**attributes)
266
        evc.install_nni_flows(attributes['primary_links'])
267
268
        in_vlan = evc.primary_links[0].get_metadata('s_vlan').value
269
        out_vlan = evc.primary_links[-1].get_metadata('s_vlan').value
270
271
        in_port = evc.primary_links[0].endpoint_b.port_number
272
        out_port = evc.primary_links[-1].endpoint_a.port_number
273
274
        expected_flow_mods = [
275
            {'match': {
276
                'in_port': in_port,
277
                'dl_vlan': in_vlan},
278
                'actions': [
279
                    {'action_type': 'set_vlan', 'vlan_id': out_vlan},
280
                    {'action_type': 'output', 'port': out_port}
281
                ]
282
             },
283
            {'match': {'in_port': out_port, 'dl_vlan': out_vlan},
284
             'actions': [
285
                {'action_type': 'set_vlan', 'vlan_id': in_vlan},
286
                {'action_type': 'output', 'port': in_port}
287
              ]
288
             }
289
        ]
290
291
        switch = evc.primary_links[0].endpoint_b.switch
292
        send_flow_mods_mock.assert_called_once_with(switch, expected_flow_mods)
293
294 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
295
    @patch('napps.kytos.mef_eline.models.EVC.choose_vlans')
296
    @patch('napps.kytos.mef_eline.models.EVC.install_nni_flows')
297
    @patch('napps.kytos.mef_eline.models.EVC.install_uni_flows')
298
    @patch('napps.kytos.mef_eline.models.EVC.activate')
299
    @patch('napps.kytos.mef_eline.models.EVC.should_deploy')
300
    def test_deploy_successfully(self, *args):
301
        """Test if all methods to deploy are called."""
302
        (should_deploy_mock, activate_mock, install_uni_flows_mock,
303
         install_nni_flows, chose_vlans_mock, log_mock) = args
304
305
        should_deploy_mock.return_value = True
306
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
307
                               switch_id="switch_uni_a", is_valid=True)
308
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
309
                               switch_id="switch_uni_z", is_valid=True)
310
311
        attributes = {
312
            "name": "custom_name",
313
            "uni_a": uni_a,
314
            "uni_z": uni_z,
315
            "primary_links": [
316
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
317
                                metadata={"s_vlan": 5}),
318
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
319
                                metadata={"s_vlan": 6})
320
            ]
321
        }
322
323
        evc = EVC(**attributes)
324
        deployed = evc.deploy(attributes['primary_links'])
325
326
        self.assertEqual(should_deploy_mock.call_count, 1)
327
        self.assertEqual(activate_mock.call_count, 1)
328
        self.assertEqual(install_uni_flows_mock.call_count, 1)
329
        self.assertEqual(install_nni_flows.call_count, 1)
330
        self.assertEqual(chose_vlans_mock.call_count, 1)
331
        log_mock.info.assert_called_once_with(f"{evc} was deployed.")
332
        self.assertTrue(deployed)
333
334 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
335
    @patch('napps.kytos.mef_eline.models.EVC.choose_vlans')
336
    @patch('napps.kytos.mef_eline.models.EVC.install_nni_flows')
337
    @patch('napps.kytos.mef_eline.models.EVC.install_uni_flows')
338
    @patch('napps.kytos.mef_eline.models.EVC.activate')
339
    @patch('napps.kytos.mef_eline.models.EVC.should_deploy')
340
    def test_deploy_fail(self, *args):
341
        """Test if all methods is ignored when the should_deploy is false."""
342
        (should_deploy_mock, activate_mock, install_uni_flows_mock,
343
         install_nni_flows, chose_vlans_mock, log_mock) = args
344
345
        should_deploy_mock.return_value = False
346
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
347
                               switch_id="switch_uni_a", is_valid=True)
348
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
349
                               switch_id="switch_uni_z", is_valid=True)
350
351
        attributes = {
352
            "name": "custom_name",
353
            "uni_a": uni_a,
354
            "uni_z": uni_z,
355
            "primary_links": [
356
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
357
                                metadata={"s_vlan": 5}),
358
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
359
                                metadata={"s_vlan": 6})
360
            ]
361
        }
362
363
        evc = EVC(**attributes)
364
        deployed = evc.deploy()
365
366
        self.assertEqual(should_deploy_mock.call_count, 1)
367
        self.assertEqual(activate_mock.call_count, 0)
368
        self.assertEqual(install_uni_flows_mock.call_count, 0)
369
        self.assertEqual(install_nni_flows.call_count, 0)
370
        self.assertEqual(chose_vlans_mock.call_count, 0)
371
        self.assertEqual(log_mock.info.call_count, 0)
372
        self.assertFalse(deployed)
373