Passed
Pull Request — master (#86)
by
unknown
03:33
created

build.tests.unit.test_main   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 183
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 147
dl 0
loc 183
rs 10
c 0
b 0
f 0
wmc 12

11 Methods

Rating   Name   Duplication   Size   Complexity  
A TestMain.test_handle_new_switch() 0 11 1
A TestMain.test_handle_interface_down() 0 11 1
A TestMain.test_interface_link_up() 0 19 1
A TestMain.test_handle_interface_up() 0 11 1
B TestMain.test_verify_api_urls() 0 41 1
A TestMain.test_get_event_listeners() 0 15 1
A TestMain.test_interface_deleted() 0 6 1
A TestMain.test_interface_link_down() 0 17 1
A TestMain.setUp() 0 12 2
A TestMain.test_handle_connection_lost() 0 9 1
A TestMain.test_handle_interface_created() 0 6 1
1
"""Module to test the main napp file."""
2
from unittest import TestCase
3
from unittest.mock import MagicMock, create_autospec, patch
4
5
from kytos.core.switch import Switch
6
from kytos.core.interface import Interface
7
from kytos.core.link import Link
8
9
from tests.unit.helpers import (get_controller_mock, get_napp_urls)
10
11
12
class TestMain(TestCase):
13
    """Test the Main class."""
14
15
    def setUp(self):
16
        """Execute steps before each tests.
17
18
        Set the server_name_url_url from kytos/topology
19
        """
20
        self.server_name_url = 'http://localhost:8181/api/kytos/topology'
21
22
        patch('kytos.core.helpers.run_on_thread', lambda x: x).start()
23
        from napps.kytos.topology.main import Main
24
        self.addCleanup(patch.stopall)
25
26
        self.napp = Main(get_controller_mock())
27
28
    def test_get_event_listeners(self):
29
        """Verify all event listeners registered."""
30
        expected_events = ['kytos/core.shutdown',
31
                           'kytos/core.shutdown.kytos/topology',
32
                           '.*.interface.is.nni',
33
                           '.*.connection.lost',
34
                           '.*.switch.interface.created',
35
                           '.*.switch.interface.deleted',
36
                           '.*.switch.interface.link_down',
37
                           '.*.switch.interface.link_up',
38
                           '.*.switch.(new|reconnected)',
39
                           '.*.switch.port.created',
40
                           'kytos/topology.*.metadata.*']
41
        actual_events = self.napp.listeners()
42
        self.assertEqual(expected_events, actual_events)
43
44
    def test_verify_api_urls(self):
45
        """Verify all APIs registered."""
46
        expected_urls = [
47
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/interfaces'),
48
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/switches'),
49
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/links'),
50
         ({}, {'GET', 'OPTIONS', 'HEAD'}, '/api/kytos/topology/v3/'),
51
         ({'key': '[key]', 'interface_id': '[interface_id]'},
52
          {'OPTIONS', 'DELETE'},
53
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata/<key>'),
54
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
55
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
56
         ({'interface_id': '[interface_id]'}, {'GET', 'OPTIONS', 'HEAD'},
57
          '/api/kytos/topology/v3/interfaces/<interface_id>/metadata'),
58
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
59
          '/api/kytos/topology/v3/interfaces/<interface_id>/disable'),
60
         ({'interface_id': '[interface_id]'}, {'POST', 'OPTIONS'},
61
          '/api/kytos/topology/v3/interfaces/<interface_id>/enable'),
62
         ({'dpid': '[dpid]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
63
          '/api/kytos/topology/v3/switches/<dpid>/metadata/<key>'),
64
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
65
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
66
         ({'dpid': '[dpid]'}, {'GET', 'OPTIONS', 'HEAD'},
67
          '/api/kytos/topology/v3/switches/<dpid>/metadata'),
68
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
69
          '/api/kytos/topology/v3/switches/<dpid>/disable'),
70
         ({'dpid': '[dpid]'}, {'POST', 'OPTIONS'},
71
          '/api/kytos/topology/v3/switches/<dpid>/enable'),
72
         ({'link_id': '[link_id]', 'key': '[key]'}, {'OPTIONS', 'DELETE'},
73
          '/api/kytos/topology/v3/links/<link_id>/metadata/<key>'),
74
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
75
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
76
         ({'link_id': '[link_id]'}, {'GET', 'OPTIONS', 'HEAD'},
77
          '/api/kytos/topology/v3/links/<link_id>/metadata'),
78
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
79
          '/api/kytos/topology/v3/links/<link_id>/disable'),
80
         ({'link_id': '[link_id]'}, {'POST', 'OPTIONS'},
81
          '/api/kytos/topology/v3/links/<link_id>/enable')]
82
83
        urls = get_napp_urls(self.napp)
84
        self.assertEqual(expected_urls, urls)
85
86
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
87
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
88
    def test_handle_new_switch(self, *args):
89
        """Test handle_new_switch."""
90
        (mock_instance_metadata, mock_notify_topology_update) = args
91
        mock_event = MagicMock()
92
        mock_switch = create_autospec(Switch)
93
        mock_event.content['switch'] = mock_switch
94
        self.napp.handle_new_switch(mock_event)
95
        mock_notify_topology_update.assert_called()
96
        mock_instance_metadata.assert_called()
97
98
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
99
    def test_handle_connection_lost(self, mock_notify_topology_update):
100
        """Test handle connection_lost."""
101
        mock_event = MagicMock()
102
        mock_switch = create_autospec(Switch)
103
        mock_switch.return_value = True
104
        mock_event.content['source'] = mock_switch
105
        self.napp.handle_connection_lost(mock_event)
106
        mock_notify_topology_update.assert_called()
107
108
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
109
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
110
    def test_handle_interface_up(self, *args):
111
        """Test handle_interface_up."""
112
        (mock_instance_metadata, mock_notify_topology_update) = args
113
        mock_event = MagicMock()
114
        mock_interface = create_autospec(Interface)
115
        mock_event.content['interface'] = mock_interface
116
        self.napp.handle_interface_up(mock_event)
117
        mock_notify_topology_update.assert_called()
118
        mock_instance_metadata.assert_called()
119
120
    @patch('napps.kytos.topology.main.Main.handle_interface_up')
121
    def test_handle_interface_created(self, mock_handle_interface_up):
122
        """Test handle interface created."""
123
        mock_event = MagicMock()
124
        self.napp.handle_interface_created(mock_event)
125
        mock_handle_interface_up.assert_called()
126
127
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
128
    @patch('napps.kytos.topology.main.Main.handle_interface_link_down')
129
    def test_handle_interface_down(self, *args):
130
        """Test handle interface down."""
131
        (mock_handle_interface_link_down, mock_notify_topology_update) = args
132
        mock_event = MagicMock()
133
        mock_interface = create_autospec(Interface)
134
        mock_event.content['interface'] = mock_interface
135
        self.napp.handle_interface_down(mock_event)
136
        mock_handle_interface_link_down.assert_called()
137
        mock_notify_topology_update.assert_called()
138
139
    @patch('napps.kytos.topology.main.Main.handle_interface_down')
140
    def test_interface_deleted(self, mock_handle_interface_link_down):
141
        """Test interface deleted."""
142
        mock_event = MagicMock()
143
        self.napp.handle_interface_deleted(mock_event)
144
        mock_handle_interface_link_down.assert_called()
145
146
    @patch('napps.kytos.topology.main.Main._get_link_from_interface')
147
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
148
    @patch('napps.kytos.topology.main.Main.update_instance_metadata')
149
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
150
    def test_interface_link_up(self, *args):
151
        """Test interface link_up."""
152
        (mock_status_change, mock_instance_metadata, mock_topology_update,
153
         mock_link_from_interface) = args
154
155
        mock_event = MagicMock()
156
        mock_interface = create_autospec(Interface)
157
        mock_link = create_autospec(Link)
158
        mock_link.is_active.return_value = False
159
        mock_link_from_interface.return_value = mock_link
160
        mock_event.content['interface'] = mock_interface
161
        self.napp.handle_interface_link_up(mock_event)
162
        mock_topology_update.assert_called()
163
        mock_instance_metadata.assert_called()
164
        mock_status_change.assert_called()
165
166
    @patch('napps.kytos.topology.main.Main._get_link_from_interface')
167
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
168
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
169
    def test_interface_link_down(self, *args):
170
        """Test interface link down."""
171
        (mock_status_change, mock_topology_update,
172
         mock_link_from_interface) = args
173
174
        mock_event = MagicMock()
175
        mock_interface = create_autospec(Interface)
176
        mock_link = create_autospec(Link)
177
        mock_link.is_active.return_value = True
178
        mock_link_from_interface.return_value = mock_link
179
        mock_event.content['interface'] = mock_interface
180
        self.napp.handle_interface_link_down(mock_event)
181
        mock_topology_update.assert_called()
182
        mock_status_change.assert_called()
183