Passed
Push — master ( b66520...374a48 )
by Humberto
02:29 queued 11s
created

build.tests.unit.test_main   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 247
Duplicated Lines 16.6 %

Importance

Changes 0
Metric Value
eloc 198
dl 41
loc 247
rs 10
c 0
b 0
f 0
wmc 18

17 Methods

Rating   Name   Duplication   Size   Complexity  
A TestMain.test_handle_new_switch() 0 11 1
A TestMain.test_notify_port_created() 0 9 1
A TestMain.test_notify_topology_update() 0 8 1
A TestMain.test_add_links() 0 9 1
A TestMain.test_notify_link_status_change() 0 9 1
A TestMain.test_handle_interface_down() 0 11 1
A TestMain.test_verify_storehouse() 0 9 1
A TestMain.test_interface_link_up() 0 19 1
A TestMain.test_handle_interface_up() 0 11 1
B TestMain.test_verify_api_urls() 41 41 1
A TestMain.test_get_event_listeners() 0 15 1
A TestMain.test_interface_deleted() 0 6 1
A TestMain.test_interface_link_down() 0 17 1
A TestMain.test_handle_connection_lost() 0 9 1
A TestMain.test_notify_metadata_changes() 0 13 1
A TestMain.setUp() 0 12 2
A TestMain.test_handle_interface_created() 0 6 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Module to test the main napp file."""
2
from unittest import TestCase
3
from unittest.mock import MagicMock, create_autospec, patch
4
5
from kytos.core.switch import Switch
6
from kytos.core.interface import Interface
7
from kytos.core.link import Link
8
9
10
from tests.unit.helpers import (get_controller_mock, get_napp_urls)
11
12
13
class TestMain(TestCase):
14
    """Test the Main class."""
15
16
    def setUp(self):
17
        """Execute steps before each tests.
18
19
        Set the server_name_url_url from kytos/topology
20
        """
21
        self.server_name_url = 'http://localhost:8181/api/kytos/topology'
22
23
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
24
        from napps.kytos.topology.main import Main
25
        self.addCleanup(patch.stopall)
26
27
        self.napp = Main(get_controller_mock())
28
29
    def test_get_event_listeners(self):
30
        """Verify all event listeners registered."""
31
        expected_events = ['kytos/core.shutdown',
32
                           'kytos/core.shutdown.kytos/topology',
33
                           '.*.interface.is.nni',
34
                           '.*.connection.lost',
35
                           '.*.switch.interface.created',
36
                           '.*.switch.interface.deleted',
37
                           '.*.switch.interface.link_down',
38
                           '.*.switch.interface.link_up',
39
                           '.*.switch.(new|reconnected)',
40
                           '.*.switch.port.created',
41
                           'kytos/topology.*.metadata.*']
42
        actual_events = self.napp.listeners()
43
        self.assertEqual(expected_events, actual_events)
44
45 View Code Duplication
    def test_verify_api_urls(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
46
        """Verify all APIs registered."""
47
        expected_urls = [
48
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/interfaces'),
49
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/switches'),
50
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/links'),
51
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/'),
52
         ({'key': '[key]', 'interface_id': '[interface_id]'},
53
          {'OPTIONS', 'DELETE'},
54
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata/<key>'),
55
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
56
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
57
         ({'interface_id': '[interface_id]'}, {'GET', 'OPTIONS', 'HEAD'},
58
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
59
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
60
          '/api/kytos/topology/v3/interfaces/<interface_id>/disable'),
61
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
62
          '/api/kytos/topology/v3/interfaces/<interface_id>/enable'),
63
         ({'dpid': '[dpid]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
64
          '/api/kytos/topology/v3/switches/<dpid>/metadata/<key>'),
65
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
66
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
67
         ({'dpid': '[dpid]'}, {'GET', 'OPTIONS', 'HEAD'},
68
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
69
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
70
          '/api/kytos/topology/v3/switches/<dpid>/disable'),
71
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
72
          '/api/kytos/topology/v3/switches/<dpid>/enable'),
73
         ({'link_id': '[link_id]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
74
          '/api/kytos/topology/v3/links/<link_id>/metadata/<key>'),
75
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
76
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
77
         ({'link_id': '[link_id]'}, {'GET', 'OPTIONS', 'HEAD'},
78
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
79
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
80
          '/api/kytos/topology/v3/links/<link_id>/disable'),
81
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
82
          '/api/kytos/topology/v3/links/<link_id>/enable')]
83
84
        urls = get_napp_urls(self.napp)
85
        self.assertEqual(expected_urls, urls)
86
87
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
88
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
89
    def test_handle_new_switch(self, *args):
90
        """Test handle_new_switch."""
91
        (mock_instance_metadata, mock_notify_topology_update) = args
92
        mock_event = MagicMock()
93
        mock_switch = create_autospec(Switch)
94
        mock_event.content['switch'] = mock_switch
95
        self.napp.handle_new_switch(mock_event)
96
        mock_notify_topology_update.assert_called()
97
        mock_instance_metadata.assert_called()
98
99
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
100
    def test_handle_connection_lost(self, mock_notify_topology_update):
101
        """Test handle connection_lost."""
102
        mock_event = MagicMock()
103
        mock_switch = create_autospec(Switch)
104
        mock_switch.return_value = True
105
        mock_event.content['source'] = mock_switch
106
        self.napp.handle_connection_lost(mock_event)
107
        mock_notify_topology_update.assert_called()
108
109
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
110
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
111
    def test_handle_interface_up(self, *args):
112
        """Test handle_interface_up."""
113
        (mock_instance_metadata, mock_notify_topology_update) = args
114
        mock_event = MagicMock()
115
        mock_interface = create_autospec(Interface)
116
        mock_event.content['interface'] = mock_interface
117
        self.napp.handle_interface_up(mock_event)
118
        mock_notify_topology_update.assert_called()
119
        mock_instance_metadata.assert_called()
120
121
    @patch('napps.kytos.topology.main.Main.handle_interface_up')
122
    def test_handle_interface_created(self, mock_handle_interface_up):
123
        """Test handle interface created."""
124
        mock_event = MagicMock()
125
        self.napp.handle_interface_created(mock_event)
126
        mock_handle_interface_up.assert_called()
127
128
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
129
    @patch('napps.kytos.topology.main.Main.handle_interface_link_down')
130
    def test_handle_interface_down(self, *args):
131
        """Test handle interface down."""
132
        (mock_handle_interface_link_down, mock_notify_topology_update) = args
133
        mock_event = MagicMock()
134
        mock_interface = create_autospec(Interface)
135
        mock_event.content['interface'] = mock_interface
136
        self.napp.handle_interface_down(mock_event)
137
        mock_handle_interface_link_down.assert_called()
138
        mock_notify_topology_update.assert_called()
139
140
    @patch('napps.kytos.topology.main.Main.handle_interface_down')
141
    def test_interface_deleted(self, mock_handle_interface_link_down):
142
        """Test interface deleted."""
143
        mock_event = MagicMock()
144
        self.napp.handle_interface_deleted(mock_event)
145
        mock_handle_interface_link_down.assert_called()
146
147
    @patch('napps.kytos.topology.main.Main._get_link_from_interface')
148
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
149
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
150
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
151
    def test_interface_link_up(self, *args):
152
        """Test interface link_up."""
153
        (mock_status_change, mock_instance_metadata, mock_topology_update,
154
         mock_link_from_interface) = args
155
156
        mock_event = MagicMock()
157
        mock_interface = create_autospec(Interface)
158
        mock_link = create_autospec(Link)
159
        mock_link.is_active.return_value = False
160
        mock_link_from_interface.return_value = mock_link
161
        mock_event.content['interface'] = mock_interface
162
        self.napp.handle_interface_link_up(mock_event)
163
        mock_topology_update.assert_called()
164
        mock_instance_metadata.assert_called()
165
        mock_status_change.assert_called()
166
167
    @patch('napps.kytos.topology.main.Main._get_link_from_interface')
168
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
169
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
170
    def test_interface_link_down(self, *args):
171
        """Test interface link down."""
172
        (mock_status_change, mock_topology_update,
173
         mock_link_from_interface) = args
174
175
        mock_event = MagicMock()
176
        mock_interface = create_autospec(Interface)
177
        mock_link = create_autospec(Link)
178
        mock_link.is_active.return_value = True
179
        mock_link_from_interface.return_value = mock_link
180
        mock_event.content['interface'] = mock_interface
181
        self.napp.handle_interface_link_down(mock_event)
182
        mock_topology_update.assert_called()
183
        mock_status_change.assert_called()
184
185
    @patch('napps.kytos.topology.main.Main._get_link_or_create')
186
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
187
    def test_add_links(self, *args):
188
        """Test add_links."""
189
        (mock_notify_topology_update, mock_get_link_or_create) = args
190
        mock_event = MagicMock()
191
        self.napp.add_links(mock_event)
192
        mock_get_link_or_create.assert_called()
193
        mock_notify_topology_update.assert_called()
194
195
    @patch('napps.kytos.topology.main.KytosEvent')
196
    @patch('kytos.core.buffers.KytosEventBuffer.put')
197
    def test_notify_topology_update(self, *args):
198
        """Test notify_topology_update."""
199
        (mock_buffers_put, mock_event) = args
200
        self.napp.notify_topology_update()
201
        mock_event.assert_called()
202
        mock_buffers_put.assert_called()
203
204
    @patch('napps.kytos.topology.main.KytosEvent')
205
    @patch('kytos.core.buffers.KytosEventBuffer.put')
206
    def test_notify_link_status_change(self, *args):
207
        """Test notify link status change."""
208
        (mock_buffers_put, mock_event) = args
209
        mock_link = create_autospec(Link)
210
        self.napp.notify_link_status_change(mock_link)
211
        mock_event.assert_called()
212
        mock_buffers_put.assert_called()
213
214
    @patch('napps.kytos.topology.main.KytosEvent')
215
    @patch('kytos.core.buffers.KytosEventBuffer.put')
216
    @patch('napps.kytos.topology.main.isinstance')
217
    def test_notify_metadata_changes(self, *args):
218
        """Test notify metadata changes."""
219
        (mock_isinstance, mock_buffers_put, mock_event) = args
220
        mock_isinstance.return_value = True
221
        mock_obj = MagicMock()
222
        mock_action = create_autospec(Switch)
223
        self.napp.notify_metadata_changes(mock_obj, mock_action)
224
        mock_event.assert_called()
225
        mock_isinstance.assert_called()
226
        mock_buffers_put.assert_called()
227
228
    @patch('napps.kytos.topology.main.KytosEvent')
229
    @patch('kytos.core.buffers.KytosEventBuffer.put')
230
    def test_notify_port_created(self, *args):
231
        """Test notify port created."""
232
        (mock_buffers_put, mock_kytos_event) = args
233
        mock_event = MagicMock()
234
        self.napp.notify_port_created(mock_event)
235
        mock_kytos_event.assert_called()
236
        mock_buffers_put.assert_called()
237
238
    @patch('napps.kytos.topology.main.KytosEvent')
239
    @patch('kytos.core.buffers.KytosEventBuffer.put')
240
    def test_verify_storehouse(self, *args):
241
        """Test verify_storehouse."""
242
        (mock_buffers_put, mock_kytos_event) = args
243
        mock_entities = MagicMock()
244
        self.napp.verify_storehouse(mock_entities)
245
        mock_buffers_put.assert_called()
246
        mock_kytos_event.assert_called()
247