Passed
Pull Request — master (#38)
by Vinicius
02:40
created

TestMain.test_handle_interface_created()   A

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 12
nop 1
dl 0
loc 14
rs 9.8
c 0
b 0
f 0
1
"""Module to test the main napp file."""
2
from unittest import TestCase
3
from unittest.mock import Mock, patch
4
5
from kytos.core.events import KytosEvent
6
from tests.integration.helpers import (get_controller_mock, get_interface_mock,
7
                                       get_switch_mock)
8
9
LINK_DATA = {
10
    "active": False,
11
    "enabled": True,
12
    "endpoint_a": {
13
        "id": "00:00:00:00:00:00:00:01:1",
14
        "link": "26927949-df3c-4c25-874b-3da30d8ae983",
15
        "mac": "26:fb:42:20:b8:b1",
16
        "name": "s1-eth1",
17
        "nni": False,
18
        "port_number": 1,
19
        "speed": "10 Gbps",
20
        "switch": "00:00:00:00:00:00:00:01",
21
        "type": "interface",
22
        "uni": True
23
    },
24
    "endpoint_b": {
25
        "id": "00:00:00:00:00:00:00:01:1",
26
        "link": "26927949-df3c-4c25-874b-3da30d8ae983",
27
        "mac": "26:fb:42:20:b8:b1",
28
        "name": "s1-eth1",
29
        "nni": False,
30
        "port_number": 1,
31
        "speed": "10 Gbps",
32
        "switch": "00:00:00:00:00:00:00:01",
33
        "type": "interface",
34
        "uni": True
35
    }
36
}
37
38
SWITCH_DATA = {
39
    "id": "00:00:00:00:00:00:00:01",
40
    "name": "my-beautiful-switch",
41
    "serial": "string",
42
    "software": "Version 2.3.4",
43
    "ofp_version": "0x01",
44
    "connection": "127.0.0.1:49330",
45
    "data_path": "string",
46
    "manufacturer": "Unkown Manufactor",
47
    "hardware": "Hardware version 2.0",
48
    "type": "switch",
49
    "active": True,
50
    "enabled": False,
51
    "dpid": "00:00:00:00:00:00:00:01",
52
    "metadata": {},
53
    "interfaces": {
54
        "additionalProp1": {
55
            "id": "00:00:00:00:00:00:00:01:1",
56
            "link": "26927949-df3c-4c25-874b-3da30d8ae983",
57
            "mac": "26:fb:42:20:b8:b1",
58
            "name": "s1-eth1",
59
            "nni": False,
60
            "port_number": 1,
61
            "speed": "10 Gbps",
62
            "switch": "00:00:00:00:00:00:00:01",
63
            "type": "interface",
64
            "uni": True
65
        },
66
        "additionalProp2": {
67
            "id": "00:00:00:00:00:00:00:01:1",
68
            "link": "26927949-df3c-4c25-874b-3da30d8ae983",
69
            "mac": "26:fb:42:20:b8:b1",
70
            "name": "s1-eth1",
71
            "nni": False,
72
            "port_number": 1,
73
            "speed": "10 Gbps",
74
            "switch": "00:00:00:00:00:00:00:01",
75
            "type": "interface",
76
            "uni": True
77
        },
78
        "additionalProp3": {
79
            "id": "00:00:00:00:00:00:00:01:1",
80
            "link": "26927949-df3c-4c25-874b-3da30d8ae983",
81
            "mac": "26:fb:42:20:b8:b1",
82
            "name": "s1-eth1",
83
            "nni": False,
84
            "port_number": 1,
85
            "speed": "10 Gbps",
86
            "switch": "00:00:00:00:00:00:00:01",
87
            "type": "interface",
88
            "uni": True
89
        }
90
    }
91
}
92
93
94
class FakeBox:
95
    """Simulate a Storehouse Box."""
96
97
    def __init__(self, data):
98
        """Initizalize default values to FakeBox."""
99
        self.data = data
100
        self.namespace = None
101
        self.name = None
102
        self.box_id = None
103
        self.created_at = None
104
        self.owner = None
105
106
107
# pylint: disable=import-outside-toplevel
108
class TestMain(TestCase):
109
    """Test the Main class."""
110
111
    def setUp(self):
112
        """Execute steps before each tests.
113
114
        Set the server_name_url from kytos/topology
115
        """
116
        self.server_name_url = 'http://localhost:8181/api/kytos/topology'
117
118
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
119
        from napps.kytos.topology.main import Main
120
        self.addCleanup(patch.stopall)
121
        self.napp = Main(get_controller_mock())
122
        self.init_napp()
123
124
    @patch('napps.kytos.topology.main.Main.verify_storehouse')
125
    def init_napp(self, mock_verify_storehouse=None):
126
        """Initialize a Topology NApp instance."""
127
        mock_verify_storehouse.return_value = None
128
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
129
        from napps.kytos.topology.main import Main
130
        self.addCleanup(patch.stopall)
131
        self.napp = Main(get_controller_mock())
132
        self.napp.store_items = {
133
            "links": FakeBox(LINK_DATA),
134
            "switches": FakeBox(SWITCH_DATA)
135
        }
136
137
    def test_get_switches_dict(self):
138
        """Basic test for switch listing."""
139
        # pylint: disable=protected-access
140
        switches = self.napp._get_switches_dict()
141
        assert isinstance(switches['switches'], dict)
142
        assert switches['switches'] == {}
143
144
    def test_get_event_listeners(self):
145
        """Verify all event listeners registered."""
146
        expected_events = ['kytos/core.shutdown',
147
                           'kytos/core.shutdown.kytos/topology',
148
                           'kytos/maintenance.start_link',
149
                           'kytos/maintenance.end_link',
150
                           'kytos/maintenance.start_switch',
151
                           'kytos/maintenance.end_switch',
152
                           '.*.network_status.updated',
153
                           '.*.interface.is.nni',
154
                           '.*.connection.lost',
155
                           '.*.switch.interface.created',
156
                           '.*.switch.interface.deleted',
157
                           '.*.switch.interface.link_down',
158
                           '.*.switch.interface.link_up',
159
                           '.*.switch.(new|reconnected)',
160
                           '.*.switch.port.created',
161
                           'kytos/storehouse.loaded']
162
        actual_events = self.napp.listeners()
163
        self.assertCountEqual(expected_events, actual_events)
164
165 View Code Duplication
    def test_verify_api_urls(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
166
        """Verify all APIs registered."""
167
        expected_urls = [
168
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/interfaces'),
169
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/switches'),
170
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/links'),
171
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/'),
172
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
173
          '/api/kytos/topology/v3/interfaces/switch/<dpid>/disable'),
174
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
175
          '/api/kytos/topology/v3/interfaces/switch/<dpid>/enable'),
176
         ({'key': '[key]', 'interface_id': '[interface_id]'},
177
          {'OPTIONS', 'DELETE'},
178
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata/<key>'),
179
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
180
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
181
         ({'interface_id': '[interface_id]'}, {'GET', 'OPTIONS', 'HEAD'},
182
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
183
         ({'interface_disable_id': '[interface_disable_id]'},
184
          {'POST', 'OPTIONS'},
185
          '/api/kytos/topology/v3/interfaces/<interface_disable_id>/disable'),
186
         ({'interface_enable_id': '[interface_enable_id]'},
187
          {'POST', 'OPTIONS'},
188
          '/api/kytos/topology/v3/interfaces/<interface_enable_id>/enable'),
189
         ({'dpid': '[dpid]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
190
          '/api/kytos/topology/v3/switches/<dpid>/metadata/<key>'),
191
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
192
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
193
         ({'dpid': '[dpid]'}, {'GET', 'OPTIONS', 'HEAD'},
194
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
195
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
196
          '/api/kytos/topology/v3/switches/<dpid>/disable'),
197
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
198
          '/api/kytos/topology/v3/switches/<dpid>/enable'),
199
         ({'link_id': '[link_id]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
200
          '/api/kytos/topology/v3/links/<link_id>/metadata/<key>'),
201
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
202
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
203
         ({'link_id': '[link_id]'}, {'GET', 'OPTIONS', 'HEAD'},
204
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
205
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
206
          '/api/kytos/topology/v3/links/<link_id>/disable'),
207
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
208
          '/api/kytos/topology/v3/links/<link_id>/enable')]
209
210
        urls = self.get_napp_urls(self.napp)
211
        self.assertEqual(expected_urls, urls)
212
213 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
214
    def get_napp_urls(napp):
215
        """Return the kytos/topology urls.
216
217
        The urls will be like:
218
219
        urls = [
220
            (options, methods, url)
221
        ]
222
223
        """
224
        controller = napp.controller
225
        controller.api_server.register_napp_endpoints(napp)
226
227
        urls = []
228
        for rule in controller.api_server.app.url_map.iter_rules():
229
            options = {}
230
            for arg in rule.arguments:
231
                options[arg] = "[{0}]".format(arg)
232
233
            if f'{napp.username}/{napp.name}' in str(rule):
234
                urls.append((options, rule.methods, f'{str(rule)}'))
235
236
        return urls
237
238
    @staticmethod
239
    def get_app_test_client(napp):
240
        """Return a flask api test client."""
241
        napp.controller.api_server.register_napp_endpoints(napp)
242
        return napp.controller.api_server.app.test_client()
243
244
    def test_save_metadata_on_store(self):
245
        """Test save metadata on store."""
246
        switch = get_switch_mock(0x04)
247
        self.napp.save_metadata_on_store(switch, 'switches')
248
        event_list_response = self.napp.controller.buffers.app.get()
249
        event_updated_response = self.napp.controller.buffers.app.get()
250
251
        self.assertEqual(event_list_response.name,
252
                         'kytos.storehouse.list')
253
        self.assertEqual(event_updated_response.name,
254
                         'kytos.storehouse.update')
255
256
    def test_handle_new_switch(self):
257
        """Test handle new switch."""
258
        event_name = '.*.switch.(new|reconnected)'
259
        switch = get_switch_mock(0x04)
260
        event = KytosEvent(name=event_name,
261
                           content={'switch': switch})
262
        self.napp.handle_new_switch(event)
263
        event_list_response = self.napp.controller.buffers.app.get()
264
        event_response = self.napp.controller.buffers.app.get()
265
266
        self.assertEqual(event_list_response.name,
267
                         'kytos.storehouse.list')
268
        self.assertEqual(event_response.name,
269
                         'kytos/topology.updated')
270
271
    def test_handle_interface_deleted(self):
272
        """Test handle interface deleted."""
273
        event_name = '.*.switch.interface.deleted'
274
        interface = get_interface_mock("interface1", 7)
275
        stats_event = KytosEvent(name=event_name,
276
                                 content={'interface': interface})
277
        self.napp.handle_interface_deleted(stats_event)
278
        event_list_response = self.napp.controller.buffers.app.get()
279
        event_updated_response = self.napp.controller.buffers.app.get()
280
        self.assertEqual(event_list_response.name,
281
                         'kytos.storehouse.list')
282
        self.assertEqual(event_updated_response.name,
283
                         'kytos/topology.updated')
284
285
    def test_handle_connection_lost(self):
286
        """Test handle connection lost."""
287
        event_name = '.*.connection.lost'
288
        source = Mock()
289
        stats_event = KytosEvent(name=event_name,
290
                                 content={'source': source})
291
        self.napp.handle_connection_lost(stats_event)
292
        event_list_response = self.napp.controller.buffers.app.get()
293
        event_updated_response = self.napp.controller.buffers.app.get()
294
        self.assertEqual(event_list_response.name,
295
                         'kytos.storehouse.list')
296
        self.assertEqual(event_updated_response.name,
297
                         'kytos/topology.updated')
298