Passed
Pull Request — master (#99)
by
unknown
02:23
created

build.tests.unit.test_main   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 465
Duplicated Lines 41.29 %

Importance

Changes 0
Metric Value
eloc 374
dl 192
loc 465
rs 10
c 0
b 0
f 0
wmc 25

24 Methods

Rating   Name   Duplication   Size   Complexity  
A TestMain.test_disable_switch() 23 23 1
A TestMain.test_add_switch_metadata() 0 21 1
A TestMain.test_get_switch_metadata() 0 17 1
A TestMain.test_enable_switch() 25 24 1
B TestMain.test_verify_api_urls() 47 47 1
A TestMain.test_get_event_listeners() 0 15 1
A TestMain.setUp() 0 12 2
A TestMain.test_handle_new_switch() 0 11 1
A TestMain.test_notify_port_created() 0 9 1
A TestMain.test_notify_topology_update() 0 8 1
A TestMain.test_add_links() 0 9 1
A TestMain.test_notify_link_status_change() 0 9 1
A TestMain.test_verify_storehouse() 0 9 1
A TestMain.test_handle_interface_down() 0 11 1
A TestMain.test_interface_link_up() 0 19 1
A TestMain.test_handle_interface_up() 0 11 1
B TestMain.test_enable_interfaces() 48 48 1
A TestMain.test_delete_switch_metadata() 0 21 1
A TestMain.test_interface_deleted() 0 6 1
B TestMain.test_disable_interfaces() 48 48 1
A TestMain.test_interface_link_down() 0 17 1
A TestMain.test_notify_metadata_changes() 0 13 1
A TestMain.test_handle_connection_lost() 0 9 1
A TestMain.test_handle_interface_created() 0 6 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Module to test the main napp file."""
2
import json
3
from unittest import TestCase
4
from unittest.mock import MagicMock, create_autospec, patch
5
6
from kytos.core.switch import Switch
7
from kytos.core.interface import Interface
8
from kytos.core.link import Link
9
from kytos.lib.helpers import get_switch_mock, get_test_client
10
11
12
from tests.unit.helpers import get_controller_mock, get_napp_urls
13
14
15
# pylint: disable=too-many-public-methods
16
class TestMain(TestCase):
17
    """Test the Main class."""
18
19
    def setUp(self):
20
        """Execute steps before each tests.
21
22
        Set the server_name_url_url from kytos/topology
23
        """
24
        self.server_name_url = 'http://localhost:8181/api/kytos/topology'
25
26
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
27
        from napps.kytos.topology.main import Main
28
        self.addCleanup(patch.stopall)
29
30
        self.napp = Main(get_controller_mock())
31
32
    def test_get_event_listeners(self):
33
        """Verify all event listeners registered."""
34
        expected_events = ['kytos/core.shutdown',
35
                           'kytos/core.shutdown.kytos/topology',
36
                           '.*.interface.is.nni',
37
                           '.*.connection.lost',
38
                           '.*.switch.interface.created',
39
                           '.*.switch.interface.deleted',
40
                           '.*.switch.interface.link_down',
41
                           '.*.switch.interface.link_up',
42
                           '.*.switch.(new|reconnected)',
43
                           '.*.switch.port.created',
44
                           'kytos/topology.*.metadata.*']
45
        actual_events = self.napp.listeners()
46
        self.assertEqual(expected_events, actual_events)
47
48 View Code Duplication
    def test_verify_api_urls(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
49
        """Verify all APIs registered."""
50
        expected_urls = [
51
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/interfaces'),
52
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/switches'),
53
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/links'),
54
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/'),
55
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
56
          '/api/kytos/topology/v3/interfaces/switch/<dpid>/disable'),
57
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
58
          '/api/kytos/topology/v3/interfaces/switch/<dpid>/enable'),
59
         ({'key': '[key]', 'interface_id': '[interface_id]'},
60
          {'OPTIONS', 'DELETE'},
61
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata/<key>'),
62
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
63
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
64
         ({'interface_id': '[interface_id]'}, {'GET', 'OPTIONS', 'HEAD'},
65
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
66
         ({'interface_disable_id': '[interface_disable_id]'},
67
          {'POST', 'OPTIONS'},
68
          '/api/kytos/topology/v3/interfaces/<interface_disable_id>/disable'),
69
         ({'interface_enable_id': '[interface_enable_id]'},
70
          {'POST', 'OPTIONS'},
71
          '/api/kytos/topology/v3/interfaces/<interface_enable_id>/enable'),
72
         ({'dpid': '[dpid]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
73
          '/api/kytos/topology/v3/switches/<dpid>/metadata/<key>'),
74
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
75
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
76
         ({'dpid': '[dpid]'}, {'GET', 'OPTIONS', 'HEAD'},
77
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
78
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
79
          '/api/kytos/topology/v3/switches/<dpid>/disable'),
80
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
81
          '/api/kytos/topology/v3/switches/<dpid>/enable'),
82
         ({'link_id': '[link_id]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
83
          '/api/kytos/topology/v3/links/<link_id>/metadata/<key>'),
84
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
85
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
86
         ({'link_id': '[link_id]'}, {'GET', 'OPTIONS', 'HEAD'},
87
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
88
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
89
          '/api/kytos/topology/v3/links/<link_id>/disable'),
90
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
91
          '/api/kytos/topology/v3/links/<link_id>/enable')]
92
93
        urls = get_napp_urls(self.napp)
94
        self.assertEqual(expected_urls, urls)
95
96 View Code Duplication
    def test_enable_switch(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
97
        """Test enable_swicth."""
98
        dpid = "00:00:00:00:00:00:00:01"
99
        mock_switch = get_switch_mock(dpid)
100
        msg_success = "Operation successful"
101
        msg_fail = "Switch not found"
102
        self.napp.controller.switches = {"00:00:00:00:00:00:00:01":
103
                                         mock_switch}
104
        api = get_test_client(self.napp.controller, self.napp)
105
106
        url = f'{self.server_name_url}/v3/switches/{dpid}/enable'
107
        response = api.post(url)
108
        self.assertEqual(response.status_code, 201, response.data)
109
        self.assertEqual(msg_success, json.loads(response.data))
110
        self.assertEqual(mock_switch.enable.call_count, 1)
111
112
        # fail case
113
        mock_switch.enable.call_count = 0
114
        dpid = "00:00:00:00:00:00:00:02"
115
        url = f'{self.server_name_url}/v3/switches/{dpid}/enable'
116
        response = api.post(url)
117
        self.assertEqual(response.status_code, 404, response.data)
118
        self.assertEqual(msg_fail, json.loads(response.data))
119
        self.assertEqual(mock_switch.enable.call_count, 0)
120
121 View Code Duplication
    def test_disable_switch(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
122
        """Test disable_swicth."""
123
        dpid = "00:00:00:00:00:00:00:01"
124
        mock_switch = get_switch_mock(dpid)
125
        msg_success = "Operation successful"
126
        msg_fail = "Switch not found"
127
        self.napp.controller.switches = {dpid: mock_switch}
128
        api = get_test_client(self.napp.controller, self.napp)
129
130
        url = f'{self.server_name_url}/v3/switches/{dpid}/disable'
131
        response = api.post(url)
132
        self.assertEqual(response.status_code, 201, response.data)
133
        self.assertEqual(msg_success, json.loads(response.data))
134
        self.assertEqual(mock_switch.disable.call_count, 1)
135
136
        # fail case
137
        mock_switch.disable.call_count = 0
138
        dpid = "00:00:00:00:00:00:00:02"
139
        url = f'{self.server_name_url}/v3/switches/{dpid}/disable'
140
        response = api.post(url)
141
        self.assertEqual(response.status_code, 404, response.data)
142
        self.assertEqual(msg_fail, json.loads(response.data))
143
        self.assertEqual(mock_switch.disable.call_count, 0)
144
145
    def test_get_switch_metadata(self):
146
        """Test get_switch_metadata."""
147
        dpid = "00:00:00:00:00:00:00:01"
148
        mock_switch = get_switch_mock(dpid)
149
        mock_switch.metadata = "A"
150
        self.napp.controller.switches = {dpid: mock_switch}
151
        api = get_test_client(self.napp.controller, self.napp)
152
153
        url = f'{self.server_name_url}/v3/switches/{dpid}/metadata'
154
        response = api.get(url)
155
        self.assertEqual(response.status_code, 200, response.data)
156
157
        # fail case
158
        dpid = "00:00:00:00:00:00:00:02"
159
        url = f'{self.server_name_url}/v3/switches/{dpid}/metadata'
160
        response = api.get(url)
161
        self.assertEqual(response.status_code, 404, response.data)
162
163
    @patch('napps.kytos.topology.main.Main.notify_metadata_changes')
164
    def test_add_switch_metadata(self, mock_metadata_changes):
165
        """Test add_switch_metadata."""
166
        dpid = "00:00:00:00:00:00:00:01"
167
        mock_switch = get_switch_mock(dpid)
168
        self.napp.controller.switches = {dpid: mock_switch}
169
        api = get_test_client(self.napp.controller, self.napp)
170
        payload = {"data": "A"}
171
172
        url = f'{self.server_name_url}/v3/switches/{dpid}/metadata'
173
        response = api.post(url, data=json.dumps(payload),
174
                            content_type='application/json')
175
        self.assertEqual(response.status_code, 201, response.data)
176
        mock_metadata_changes.assert_called()
177
178
        # fail case
179
        dpid = "00:00:00:00:00:00:00:02"
180
        url = f'{self.server_name_url}/v3/switches/{dpid}/metadata'
181
        response = api.post(url, data=json.dumps(payload),
182
                            content_type='application/json')
183
        self.assertEqual(response.status_code, 404, response.data)
184
185
    @patch('napps.kytos.topology.main.Main.notify_metadata_changes')
186
    def test_delete_switch_metadata(self, mock_metadata_changes):
187
        """Test delete_switch_metadata."""
188
        dpid = "00:00:00:00:00:00:00:01"
189
        mock_switch = get_switch_mock(dpid)
190
        self.napp.controller.switches = {dpid: mock_switch}
191
        api = get_test_client(self.napp.controller, self.napp)
192
193
        key = "A"
194
        url = f'{self.server_name_url}/v3/switches/{dpid}/metadata/{key}'
195
        response = api.delete(url)
196
        mock_metadata_changes.assert_called()
197
        self.assertEqual(response.status_code, 200, response.data)
198
199
        # fail case
200
        key = "A"
201
        dpid = "00:00:00:00:00:00:00:02"
202
        url = f'{self.server_name_url}/v3/switches/{dpid}/metadata/{key}'
203
        response = api.delete(url)
204
        mock_metadata_changes.assert_called()
205
        self.assertEqual(response.status_code, 404, response.data)
206
207 View Code Duplication
    def test_enable_interfaces(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        """Test enable_interfaces."""
209
        mock_switch = create_autospec(Switch)
210
        mock_interface_1 = create_autospec(Interface)
211
        mock_interface_2 = create_autospec(Interface)
212
        mock_switch.interfaces = {1: mock_interface_1, 2: mock_interface_2}
213
        self.napp.controller.switches = {'00:00:00:00:00:00:00:01':
214
                                         mock_switch}
215
        api = get_test_client(self.napp.controller, self.napp)
216
        expected_success = 'Operation successful'
217
218
        interface_id = '00:00:00:00:00:00:00:01:1'
219
        url = f'{self.server_name_url}/v3/interfaces/{interface_id}/enable'
220
        response = api.post(url)
221
        self.assertEqual(response.status_code, 200, response.data)
222
        self.assertEqual(expected_success, json.loads(response.data))
223
        self.assertEqual(mock_interface_1.enable.call_count, 1)
224
        self.assertEqual(mock_interface_2.enable.call_count, 0)
225
226
        dpid = '00:00:00:00:00:00:00:01'
227
        mock_interface_1.enable.call_count = 0
228
        mock_interface_2.enable.call_count = 0
229
        url = f'{self.server_name_url}/v3/interfaces/switch/{dpid}/enable'
230
        response = api.post(url)
231
        self.assertEqual(response.status_code, 200, response.data)
232
        self.assertEqual(expected_success, json.loads(response.data))
233
        self.assertEqual(mock_interface_1.enable.call_count, 1)
234
        self.assertEqual(mock_interface_2.enable.call_count, 1)
235
236
        # test interface not found
237
        interface_id = '00:00:00:00:00:00:00:01:3'
238
        mock_interface_1.enable.call_count = 0
239
        mock_interface_2.enable.call_count = 0
240
        url = f'{self.server_name_url}/v3/interfaces/{interface_id}/enable'
241
        response = api.post(url)
242
        self.assertEqual(response.status_code, 409, response.data)
243
        self.assertEqual(mock_interface_1.enable.call_count, 0)
244
        self.assertEqual(mock_interface_2.enable.call_count, 0)
245
246
        # test switch not found
247
        dpid = '00:00:00:00:00:00:00:02'
248
        expected_fail = f"Switch not found: '{dpid}'"
249
        url = f'{self.server_name_url}/v3/interfaces/switch/{dpid}/enable'
250
        response = api.post(url)
251
        self.assertEqual(response.status_code, 404, response.data)
252
        self.assertEqual(expected_fail, json.loads(response.data))
253
        self.assertEqual(mock_interface_1.enable.call_count, 0)
254
        self.assertEqual(mock_interface_2.enable.call_count, 0)
255
256 View Code Duplication
    def test_disable_interfaces(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
257
        """Test disable_interfaces."""
258
        interface_id = '00:00:00:00:00:00:00:01:1'
259
        dpid = '00:00:00:00:00:00:00:01'
260
        expected = 'Operation successful'
261
        mock_switch = create_autospec(Switch)
262
        mock_interface_1 = create_autospec(Interface)
263
        mock_interface_2 = create_autospec(Interface)
264
        mock_switch.interfaces = {1: mock_interface_1, 2: mock_interface_2}
265
        self.napp.controller.switches = {'00:00:00:00:00:00:00:01':
266
                                         mock_switch}
267
        api = get_test_client(self.napp.controller, self.napp)
268
269
        url = f'{self.server_name_url}/v3/interfaces/{interface_id}/disable'
270
        response = api.post(url)
271
        self.assertEqual(response.status_code, 200, response.data)
272
        self.assertEqual(expected, json.loads(response.data))
273
        self.assertEqual(mock_interface_1.disable.call_count, 1)
274
        self.assertEqual(mock_interface_2.disable.call_count, 0)
275
276
        mock_interface_1.disable.call_count = 0
277
        mock_interface_2.disable.call_count = 0
278
        url = f'{self.server_name_url}/v3/interfaces/switch/{dpid}/disable'
279
        response = api.post(url)
280
        self.assertEqual(response.status_code, 200, response.data)
281
        self.assertEqual(expected, json.loads(response.data))
282
        self.assertEqual(mock_interface_1.disable.call_count, 1)
283
        self.assertEqual(mock_interface_2.disable.call_count, 1)
284
285
        # test interface not found
286
        interface_id = '00:00:00:00:00:00:00:01:3'
287
        mock_interface_1.disable.call_count = 0
288
        mock_interface_2.disable.call_count = 0
289
        url = f'{self.server_name_url}/v3/interfaces/{interface_id}/disable'
290
        response = api.post(url)
291
        self.assertEqual(response.status_code, 409, response.data)
292
        self.assertEqual(mock_interface_1.disable.call_count, 0)
293
        self.assertEqual(mock_interface_2.disable.call_count, 0)
294
295
        # test switch not found
296
        dpid = '00:00:00:00:00:00:00:02'
297
        expected_fail = f"Switch not found: '{dpid}'"
298
        url = f'{self.server_name_url}/v3/interfaces/switch/{dpid}/disable'
299
        response = api.post(url)
300
        self.assertEqual(response.status_code, 404, response.data)
301
        self.assertEqual(expected_fail, json.loads(response.data))
302
        self.assertEqual(mock_interface_1.disable.call_count, 0)
303
        self.assertEqual(mock_interface_2.disable.call_count, 0)
304
305
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
306
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
307
    def test_handle_new_switch(self, *args):
308
        """Test handle_new_switch."""
309
        (mock_instance_metadata, mock_notify_topology_update) = args
310
        mock_event = MagicMock()
311
        mock_switch = create_autospec(Switch)
312
        mock_event.content['switch'] = mock_switch
313
        self.napp.handle_new_switch(mock_event)
314
        mock_notify_topology_update.assert_called()
315
        mock_instance_metadata.assert_called()
316
317
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
318
    def test_handle_connection_lost(self, mock_notify_topology_update):
319
        """Test handle connection_lost."""
320
        mock_event = MagicMock()
321
        mock_switch = create_autospec(Switch)
322
        mock_switch.return_value = True
323
        mock_event.content['source'] = mock_switch
324
        self.napp.handle_connection_lost(mock_event)
325
        mock_notify_topology_update.assert_called()
326
327
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
328
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
329
    def test_handle_interface_up(self, *args):
330
        """Test handle_interface_up."""
331
        (mock_instance_metadata, mock_notify_topology_update) = args
332
        mock_event = MagicMock()
333
        mock_interface = create_autospec(Interface)
334
        mock_event.content['interface'] = mock_interface
335
        self.napp.handle_interface_up(mock_event)
336
        mock_notify_topology_update.assert_called()
337
        mock_instance_metadata.assert_called()
338
339
    @patch('napps.kytos.topology.main.Main.handle_interface_up')
340
    def test_handle_interface_created(self, mock_handle_interface_up):
341
        """Test handle interface created."""
342
        mock_event = MagicMock()
343
        self.napp.handle_interface_created(mock_event)
344
        mock_handle_interface_up.assert_called()
345
346
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
347
    @patch('napps.kytos.topology.main.Main.handle_interface_link_down')
348
    def test_handle_interface_down(self, *args):
349
        """Test handle interface down."""
350
        (mock_handle_interface_link_down, mock_notify_topology_update) = args
351
        mock_event = MagicMock()
352
        mock_interface = create_autospec(Interface)
353
        mock_event.content['interface'] = mock_interface
354
        self.napp.handle_interface_down(mock_event)
355
        mock_handle_interface_link_down.assert_called()
356
        mock_notify_topology_update.assert_called()
357
358
    @patch('napps.kytos.topology.main.Main.handle_interface_down')
359
    def test_interface_deleted(self, mock_handle_interface_link_down):
360
        """Test interface deleted."""
361
        mock_event = MagicMock()
362
        self.napp.handle_interface_deleted(mock_event)
363
        mock_handle_interface_link_down.assert_called()
364
365
    @patch('napps.kytos.topology.main.Main._get_link_from_interface')
366
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
367
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
368
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
369
    def test_interface_link_up(self, *args):
370
        """Test interface link_up."""
371
        (mock_status_change, mock_instance_metadata, mock_topology_update,
372
         mock_link_from_interface) = args
373
374
        mock_event = MagicMock()
375
        mock_interface = create_autospec(Interface)
376
        mock_link = create_autospec(Link)
377
        mock_link.is_active.return_value = False
378
        mock_link_from_interface.return_value = mock_link
379
        mock_event.content['interface'] = mock_interface
380
        self.napp.handle_interface_link_up(mock_event)
381
        mock_topology_update.assert_called()
382
        mock_instance_metadata.assert_called()
383
        mock_status_change.assert_called()
384
385
    @patch('napps.kytos.topology.main.Main._get_link_from_interface')
386
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
387
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
388
    def test_interface_link_down(self, *args):
389
        """Test interface link down."""
390
        (mock_status_change, mock_topology_update,
391
         mock_link_from_interface) = args
392
393
        mock_event = MagicMock()
394
        mock_interface = create_autospec(Interface)
395
        mock_link = create_autospec(Link)
396
        mock_link.is_active.return_value = True
397
        mock_link_from_interface.return_value = mock_link
398
        mock_event.content['interface'] = mock_interface
399
        self.napp.handle_interface_link_down(mock_event)
400
        mock_topology_update.assert_called()
401
        mock_status_change.assert_called()
402
403
    @patch('napps.kytos.topology.main.Main._get_link_or_create')
404
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
405
    def test_add_links(self, *args):
406
        """Test add_links."""
407
        (mock_notify_topology_update, mock_get_link_or_create) = args
408
        mock_event = MagicMock()
409
        self.napp.add_links(mock_event)
410
        mock_get_link_or_create.assert_called()
411
        mock_notify_topology_update.assert_called()
412
413
    @patch('napps.kytos.topology.main.KytosEvent')
414
    @patch('kytos.core.buffers.KytosEventBuffer.put')
415
    def test_notify_topology_update(self, *args):
416
        """Test notify_topology_update."""
417
        (mock_buffers_put, mock_event) = args
418
        self.napp.notify_topology_update()
419
        mock_event.assert_called()
420
        mock_buffers_put.assert_called()
421
422
    @patch('napps.kytos.topology.main.KytosEvent')
423
    @patch('kytos.core.buffers.KytosEventBuffer.put')
424
    def test_notify_link_status_change(self, *args):
425
        """Test notify link status change."""
426
        (mock_buffers_put, mock_event) = args
427
        mock_link = create_autospec(Link)
428
        self.napp.notify_link_status_change(mock_link)
429
        mock_event.assert_called()
430
        mock_buffers_put.assert_called()
431
432
    @patch('napps.kytos.topology.main.KytosEvent')
433
    @patch('kytos.core.buffers.KytosEventBuffer.put')
434
    @patch('napps.kytos.topology.main.isinstance')
435
    def test_notify_metadata_changes(self, *args):
436
        """Test notify metadata changes."""
437
        (mock_isinstance, mock_buffers_put, mock_event) = args
438
        mock_isinstance.return_value = True
439
        mock_obj = MagicMock()
440
        mock_action = create_autospec(Switch)
441
        self.napp.notify_metadata_changes(mock_obj, mock_action)
442
        mock_event.assert_called()
443
        mock_isinstance.assert_called()
444
        mock_buffers_put.assert_called()
445
446
    @patch('napps.kytos.topology.main.KytosEvent')
447
    @patch('kytos.core.buffers.KytosEventBuffer.put')
448
    def test_notify_port_created(self, *args):
449
        """Test notify port created."""
450
        (mock_buffers_put, mock_kytos_event) = args
451
        mock_event = MagicMock()
452
        self.napp.notify_port_created(mock_event)
453
        mock_kytos_event.assert_called()
454
        mock_buffers_put.assert_called()
455
456
    @patch('napps.kytos.topology.main.KytosEvent')
457
    @patch('kytos.core.buffers.KytosEventBuffer.put')
458
    def test_verify_storehouse(self, *args):
459
        """Test verify_storehouse."""
460
        (mock_buffers_put, mock_kytos_event) = args
461
        mock_entities = MagicMock()
462
        self.napp.verify_storehouse(mock_entities)
463
        mock_buffers_put.assert_called()
464
        mock_kytos_event.assert_called()
465