Passed
Push — master ( 8c0a82...c7049e )
by Vinicius
04:18 queued 02:48
created

TestMain.test_handle_interface_deleted()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 12
nop 1
dl 0
loc 13
rs 9.8
c 0
b 0
f 0
1
"""Module to test the main napp file."""
2
from unittest import TestCase
3
from unittest.mock import Mock, patch
4
5
# pylint: disable=import-error,no-name-in-module
6
from kytos.core.events import KytosEvent
7
from tests.integration.helpers import (get_controller_mock, get_interface_mock,
8
                                       get_switch_mock)
9
10
LINK_DATA = {
11
    "active": False,
12
    "enabled": True,
13
    "endpoint_a": {
14
        "id": "00:00:00:00:00:00:00:01:1",
15
        "link": "26927949-df3c-4c25-874b-3da30d8ae983",
16
        "mac": "26:fb:42:20:b8:b1",
17
        "name": "s1-eth1",
18
        "nni": False,
19
        "port_number": 1,
20
        "speed": "10 Gbps",
21
        "switch": "00:00:00:00:00:00:00:01",
22
        "type": "interface",
23
        "uni": True
24
    },
25
    "endpoint_b": {
26
        "id": "00:00:00:00:00:00:00:01:1",
27
        "link": "26927949-df3c-4c25-874b-3da30d8ae983",
28
        "mac": "26:fb:42:20:b8:b1",
29
        "name": "s1-eth1",
30
        "nni": False,
31
        "port_number": 1,
32
        "speed": "10 Gbps",
33
        "switch": "00:00:00:00:00:00:00:01",
34
        "type": "interface",
35
        "uni": True
36
    }
37
}
38
39
SWITCH_DATA = {
40
    "id": "00:00:00:00:00:00:00:01",
41
    "name": "my-beautiful-switch",
42
    "serial": "string",
43
    "software": "Version 2.3.4",
44
    "ofp_version": "0x01",
45
    "connection": "127.0.0.1:49330",
46
    "data_path": "string",
47
    "manufacturer": "Unkown Manufactor",
48
    "hardware": "Hardware version 2.0",
49
    "type": "switch",
50
    "active": True,
51
    "enabled": False,
52
    "dpid": "00:00:00:00:00:00:00:01",
53
    "metadata": {},
54
    "interfaces": {
55
        "additionalProp1": {
56
            "id": "00:00:00:00:00:00:00:01:1",
57
            "link": "26927949-df3c-4c25-874b-3da30d8ae983",
58
            "mac": "26:fb:42:20:b8:b1",
59
            "name": "s1-eth1",
60
            "nni": False,
61
            "port_number": 1,
62
            "speed": "10 Gbps",
63
            "switch": "00:00:00:00:00:00:00:01",
64
            "type": "interface",
65
            "uni": True
66
        },
67
        "additionalProp2": {
68
            "id": "00:00:00:00:00:00:00:01:1",
69
            "link": "26927949-df3c-4c25-874b-3da30d8ae983",
70
            "mac": "26:fb:42:20:b8:b1",
71
            "name": "s1-eth1",
72
            "nni": False,
73
            "port_number": 1,
74
            "speed": "10 Gbps",
75
            "switch": "00:00:00:00:00:00:00:01",
76
            "type": "interface",
77
            "uni": True
78
        },
79
        "additionalProp3": {
80
            "id": "00:00:00:00:00:00:00:01:1",
81
            "link": "26927949-df3c-4c25-874b-3da30d8ae983",
82
            "mac": "26:fb:42:20:b8:b1",
83
            "name": "s1-eth1",
84
            "nni": False,
85
            "port_number": 1,
86
            "speed": "10 Gbps",
87
            "switch": "00:00:00:00:00:00:00:01",
88
            "type": "interface",
89
            "uni": True
90
        }
91
    }
92
}
93
94
95
class FakeBox:
96
    """Simulate a Storehouse Box."""
97
98
    def __init__(self, data):
99
        """Initizalize default values to FakeBox."""
100
        self.data = data
101
        self.namespace = None
102
        self.name = None
103
        self.box_id = None
104
        self.created_at = None
105
        self.owner = None
106
107
108
# pylint: disable=import-outside-toplevel
109
class TestMain(TestCase):
110
    """Test the Main class."""
111
112
    def setUp(self):
113
        """Execute steps before each tests.
114
115
        Set the server_name_url from kytos/topology
116
        """
117
        self.server_name_url = 'http://localhost:8181/api/kytos/topology'
118
119
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
120
        from napps.kytos.topology.main import Main
121
        self.addCleanup(patch.stopall)
122
        self.napp = Main(get_controller_mock())
123
        self.init_napp()
124
125
    @patch('napps.kytos.topology.main.Main.verify_storehouse')
126
    def init_napp(self, mock_verify_storehouse=None):
127
        """Initialize a Topology NApp instance."""
128
        mock_verify_storehouse.return_value = None
129
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
130
        from napps.kytos.topology.main import Main
131
        self.addCleanup(patch.stopall)
132
        self.napp = Main(get_controller_mock())
133
        self.napp.store_items = {
134
            "links": FakeBox(LINK_DATA),
135
            "switches": FakeBox(SWITCH_DATA)
136
        }
137
138
    def test_get_switches_dict(self):
139
        """Basic test for switch listing."""
140
        # pylint: disable=protected-access,
141
        # pylint: disable=use-implicit-booleaness-not-comparison
142
        switches = self.napp._get_switches_dict()
143
        assert isinstance(switches['switches'], dict)
144
        assert switches['switches'] == {}
145
146
    def test_get_event_listeners(self):
147
        """Verify all event listeners registered."""
148
        expected_events = ['kytos/core.shutdown',
149
                           'kytos/core.shutdown.kytos/topology',
150
                           'kytos/maintenance.start_link',
151
                           'kytos/maintenance.end_link',
152
                           'kytos/maintenance.start_switch',
153
                           'kytos/maintenance.end_switch',
154
                           '.*.network_status.updated',
155
                           '.*.interface.is.nni',
156
                           '.*.connection.lost',
157
                           '.*.switch.interface.created',
158
                           '.*.switch.interface.deleted',
159
                           '.*.switch.interface.link_down',
160
                           '.*.switch.interface.link_up',
161
                           '.*.switch.(new|reconnected)',
162
                           '.*.switch.port.created',
163
                           'kytos/storehouse.loaded']
164
        actual_events = self.napp.listeners()
165
        self.assertCountEqual(expected_events, actual_events)
166
167 View Code Duplication
    def test_verify_api_urls(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
168
        """Verify all APIs registered."""
169
        expected_urls = [
170
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/interfaces'),
171
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/switches'),
172
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/links'),
173
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/'),
174
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
175
          '/api/kytos/topology/v3/interfaces/switch/<dpid>/disable'),
176
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
177
          '/api/kytos/topology/v3/interfaces/switch/<dpid>/enable'),
178
         ({'key': '[key]', 'interface_id': '[interface_id]'},
179
          {'OPTIONS', 'DELETE'},
180
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata/<key>'),
181
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
182
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
183
         ({'interface_id': '[interface_id]'}, {'GET', 'OPTIONS', 'HEAD'},
184
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
185
         ({'interface_disable_id': '[interface_disable_id]'},
186
          {'POST', 'OPTIONS'},
187
          '/api/kytos/topology/v3/interfaces/<interface_disable_id>/disable'),
188
         ({'interface_enable_id': '[interface_enable_id]'},
189
          {'POST', 'OPTIONS'},
190
          '/api/kytos/topology/v3/interfaces/<interface_enable_id>/enable'),
191
         ({'dpid': '[dpid]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
192
          '/api/kytos/topology/v3/switches/<dpid>/metadata/<key>'),
193
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
194
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
195
         ({'dpid': '[dpid]'}, {'GET', 'OPTIONS', 'HEAD'},
196
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
197
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
198
          '/api/kytos/topology/v3/switches/<dpid>/disable'),
199
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
200
          '/api/kytos/topology/v3/switches/<dpid>/enable'),
201
         ({'link_id': '[link_id]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
202
          '/api/kytos/topology/v3/links/<link_id>/metadata/<key>'),
203
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
204
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
205
         ({'link_id': '[link_id]'}, {'GET', 'OPTIONS', 'HEAD'},
206
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
207
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
208
          '/api/kytos/topology/v3/links/<link_id>/disable'),
209
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
210
          '/api/kytos/topology/v3/links/<link_id>/enable')]
211
212
        urls = self.get_napp_urls(self.napp)
213
        self.assertEqual(expected_urls, urls)
214
215 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
216
    def get_napp_urls(napp):
217
        """Return the kytos/topology urls.
218
219
        The urls will be like:
220
221
        urls = [
222
            (options, methods, url)
223
        ]
224
225
        """
226
        controller = napp.controller
227
        controller.api_server.register_napp_endpoints(napp)
228
229
        urls = []
230
        for rule in controller.api_server.app.url_map.iter_rules():
231
            options = {}
232
            for arg in rule.arguments:
233
                options[arg] = f"[{arg}]"
234
235
            if f'{napp.username}/{napp.name}' in str(rule):
236
                urls.append((options, rule.methods, f'{str(rule)}'))
237
238
        return urls
239
240
    @staticmethod
241
    def get_app_test_client(napp):
242
        """Return a flask api test client."""
243
        napp.controller.api_server.register_napp_endpoints(napp)
244
        return napp.controller.api_server.app.test_client()
245
246
    def test_save_metadata_on_store(self):
247
        """Test save metadata on store."""
248
        switch = get_switch_mock(0x04)
249
        self.napp.save_metadata_on_store(switch, 'switches')
250
        event_list_response = self.napp.controller.buffers.app.get()
251
        event_updated_response = self.napp.controller.buffers.app.get()
252
253
        self.assertEqual(event_list_response.name,
254
                         'kytos.storehouse.list')
255
        self.assertEqual(event_updated_response.name,
256
                         'kytos.storehouse.update')
257
258
    def test_handle_new_switch(self):
259
        """Test handle new switch."""
260
        event_name = '.*.switch.(new|reconnected)'
261
        switch = get_switch_mock(0x04)
262
        event = KytosEvent(name=event_name,
263
                           content={'switch': switch})
264
        self.napp.handle_new_switch(event)
265
        event_list_response = self.napp.controller.buffers.app.get()
266
        event_response = self.napp.controller.buffers.app.get()
267
268
        self.assertEqual(event_list_response.name,
269
                         'kytos.storehouse.list')
270
        self.assertEqual(event_response.name,
271
                         'kytos/topology.updated')
272
273
    def test_handle_interface_deleted(self):
274
        """Test handle interface deleted."""
275
        event_name = '.*.switch.interface.deleted'
276
        interface = get_interface_mock("interface1", 7)
277
        stats_event = KytosEvent(name=event_name,
278
                                 content={'interface': interface})
279
        self.napp.handle_interface_deleted(stats_event)
280
        event_list_response = self.napp.controller.buffers.app.get()
281
        event_updated_response = self.napp.controller.buffers.app.get()
282
        self.assertEqual(event_list_response.name,
283
                         'kytos.storehouse.list')
284
        self.assertEqual(event_updated_response.name,
285
                         'kytos/topology.updated')
286
287
    def test_handle_connection_lost(self):
288
        """Test handle connection lost."""
289
        event_name = '.*.connection.lost'
290
        source = Mock()
291
        stats_event = KytosEvent(name=event_name,
292
                                 content={'source': source})
293
        self.napp.handle_connection_lost(stats_event)
294
        event_list_response = self.napp.controller.buffers.app.get()
295
        event_updated_response = self.napp.controller.buffers.app.get()
296
        self.assertEqual(event_list_response.name,
297
                         'kytos.storehouse.list')
298
        self.assertEqual(event_updated_response.name,
299
                         'kytos/topology.updated')
300