TestLink.test_status_funcs()   C
last analyzed

Complexity

Conditions 10

Size

Total Lines 63
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 31
CRAP Score 10

Importance

Changes 0
Metric Value
cc 10
eloc 47
nop 1
dl 0
loc 63
ccs 31
cts 31
cp 1
crap 10
rs 5.9345
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tests.unit.test_core.test_link.TestLink.test_status_funcs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Link tests."""
2 1
import logging
3 1
import time
4 1
from collections import OrderedDict
5 1
from unittest.mock import Mock
6
7 1
import pytest
8
9 1
from kytos.core.common import EntityStatus
10 1
from kytos.core.exceptions import KytosLinkCreationError
11 1
from kytos.core.interface import Interface, TAGType
12 1
from kytos.core.link import Link
13 1
from kytos.core.switch import Switch
14
15 1
logging.basicConfig(level=logging.CRITICAL)
16
17
18
# pylint: disable=protected-access,too-many-public-methods
19 1
class TestLink():
20
    """Test Links."""
21
22 1
    def setup_method(self):
23
        """Create interface objects."""
24 1
        self.switch1 = self._get_v0x04_switch('dpid1')
25 1
        self.iface1 = Interface('interface1', 41, self.switch1)
26 1
        self.switch2 = self._get_v0x04_switch('dpid2')
27 1
        self.iface2 = Interface('interface2', 42, self.switch2)
28
29 1
    @staticmethod
30 1
    def _get_v0x04_switch(name: str):
31
        """Create a v0x04 Switch"""
32 1
        switch = Switch(name)
33 1
        switch.connection = Mock()
34 1
        switch.connection.protocol.version = 0x04
35 1
        return switch
36
37 1
    def test__eq__(self):
38
        """Test __eq__ method."""
39 1
        link_1 = Link(self.iface1, self.iface2)
40 1
        link_2 = Link(self.iface2, self.iface1)
41
42 1
        iface3 = Interface('interface1', 1, self.switch1)
43 1
        iface4 = Interface('interface2', 2, self.switch2)
44
45 1
        link_3 = Link(iface3, iface4)
46
47 1
        assert link_1 == link_2
48 1
        assert (link_1 == link_3) is False
49
50 1
    def test__repr__(self):
51
        """Test __repr__ method."""
52 1
        link = Link(self.iface1, self.iface2)
53 1
        expected = ("Link(Interface('interface1', 41, Switch('dpid1')), "
54
                    "Interface('interface2', 42, Switch('dpid2')), "
55
                    f"{link.id})")
56 1
        assert repr(link) == expected
57
58 1
    def test_id(self):
59
        """Test id property."""
60 1
        ids = []
61
62 1
        for value in [(self.switch1, 1, self.switch2, 2),
63
                      (self.switch2, 2, self.switch1, 1),
64
                      (self.switch1, 1, self.switch1, 2),
65
                      (self.switch1, 2, self.switch1, 1)]:
66 1
            iface1 = Interface('iface', value[1], value[0])
67 1
            iface2 = Interface('iface', value[3], value[2])
68 1
            link = Link(iface1, iface2)
69
70 1
            ids.append(link.id)
71
72 1
        assert ids[0] == ids[1]
73 1
        assert ids[2] == ids[3]
74 1
        assert ids[0] != ids[2]
75
76 1
    def test_init(self):
77
        """Test normal Link initialization."""
78 1
        link = Link(self.iface1, self.iface2)
79 1
        assert isinstance(link, Link)
80 1
        assert link.is_active()
81 1
        assert link.is_enabled() is False
82
83 1
    def test_init_with_null_endpoints(self):
84
        """Test initialization with None as endpoints."""
85 1
        with pytest.raises(KytosLinkCreationError):
86 1
            Link(self.iface1, None)
87
88 1
        with pytest.raises(KytosLinkCreationError):
89 1
            Link(None, self.iface2)
90
91 1
    def test_link_id(self):
92
        """Test equality of links with the same values in different order."""
93 1
        link1 = Link(self.iface1, self.iface2)
94 1
        link2 = Link(self.iface2, self.iface1)
95 1
        assert link1.id == link2.id
96
97 1
    def test_status_funcs(self) -> None:
98
        """Test status_funcs."""
99
        # If it's enabled and active but a func returns DOWN, then DOWN
100 1
        Link.status_funcs = OrderedDict()
101 1
        Link.status_reason_funcs = OrderedDict()
102 1
        Link.register_status_func(
103
            "some_napp_some_func",
104
            lambda link: EntityStatus.DOWN
105
        )
106 1
        Link.register_status_reason_func(
107
            "some_napp_some_func",
108
            lambda link: {'test_status'}
109
        )
110 1
        for intf in (self.iface1, self.iface2):
111 1
            intf.enable()
112 1
            intf.activate()
113 1
        link = Link(self.iface1, self.iface2)
114 1
        link.enable()
115 1
        link.activate()
116 1
        assert link.status == EntityStatus.DOWN
117 1
        assert link.status_reason == {'test_status'}
118
119
        # No changes in status if it returns None
120 1
        Link.register_status_func(
121
            "some_napp_some_func",
122
            lambda link: None
123
        )
124 1
        Link.register_status_reason_func(
125
            "some_napp_some_func",
126
            lambda link: set()
127
        )
128 1
        assert link.status == EntityStatus.UP
129 1
        assert link.status_reason == set()
130
131
        # If it's deactivated then it shouldn't be able to make it go UP
132 1
        link.deactivate()
133 1
        assert link.status == EntityStatus.DOWN
134 1
        Link.register_status_func(
135
            "some_napp_some_func",
136
            lambda link: EntityStatus.UP
137
        )
138 1
        Link.register_status_reason_func(
139
            "some_napp_some_func",
140
            lambda link: set()
141
        )
142 1
        assert link.status == EntityStatus.DOWN
143 1
        assert link.status_reason == {'deactivated'}
144 1
        link.activate()
145 1
        assert link.status == EntityStatus.UP
146 1
        assert link.status_reason == set()
147
148
        # If it's disabled, then it shouldn't be considered DOWN
149 1
        link.disable()
150 1
        Link.register_status_func(
151
            "some_napp_some_func",
152
            lambda link: EntityStatus.DOWN
153
        )
154 1
        Link.register_status_reason_func(
155
            "some_napp_some_func",
156
            lambda link: {'test_status'}
157
        )
158 1
        assert link.status == EntityStatus.DISABLED
159 1
        assert link.status_reason == {'test_status', 'disabled'}
160
161 1
    def test_multiple_status_funcs(self) -> None:
162
        """Test multiple status_funcs."""
163
        # If it's enabled and active but a func returns DOWN, then DOWN
164 1
        Link.status_funcs = OrderedDict()
165 1
        Link.status_reason_funcs = OrderedDict()
166 1
        Link.register_status_func(
167
            "some_napp_some_func",
168
            lambda link: None
169
        )
170 1
        Link.register_status_reason_func(
171
            'some_napp_some_func',
172
            lambda link: set()
173
        )
174 1
        Link.register_status_func(
175
            "some_napp_another_func",
176
            lambda link: EntityStatus.DOWN
177
        )
178 1
        Link.register_status_reason_func(
179
            'some_napp_another_func',
180
            lambda link: {'test_status2'}
181
        )
182 1
        for intf in (self.iface1, self.iface2):
183 1
            intf.enable()
184 1
            intf.activate()
185 1
        link = Link(self.iface1, self.iface2)
186 1
        link.enable()
187 1
        link.activate()
188 1
        assert link.status == EntityStatus.DOWN
189 1
        assert link.status_reason == {'test_status2'}
190
191
        # It should be UP if the func no longer returns DOWN
192 1
        Link.register_status_func(
193
            "some_napp_another_func",
194
            lambda link: None
195
        )
196 1
        Link.register_status_reason_func(
197
            'some_napp_another_func',
198
            lambda link: set()
199
        )
200 1
        assert link.status == EntityStatus.UP
201 1
        assert link.status_reason == set()
202
203 1
    def test_available_tags(self):
204
        """Test available_tags property."""
205 1
        link = Link(self.iface1, self.iface2)
206 1
        link.endpoint_a.available_tags['vlan'] = [[1, 100]]
207 1
        link.endpoint_b.available_tags['vlan'] = [[50, 200]]
208
209 1
        vlans = link.available_tags()
210 1
        assert vlans == [[50, 100]]
211
212 1
    def test_get_next_available_tag(self, controller):
213
        """Test get next available tags returns different tags"""
214 1
        link = Link(self.iface1, self.iface2)
215 1
        tag = link.get_next_available_tag(controller, "link_id")
216 1
        next_tag = link.get_next_available_tag(controller, "link_id")
217 1
        assert tag == 1
218 1
        assert next_tag == 2
219
220 1
    def test_get_next_available_tag_reverse(self, controller):
221
        """Test get last available tags returns different tags"""
222 1
        link = Link(self.iface1, self.iface2)
223 1
        tag = link.get_next_available_tag(controller, "link_id", True)
224 1
        next_tag = link.get_next_available_tag(controller, "link_id", True)
225 1
        assert tag == 4094
226 1
        assert next_tag == 4093
227
228 1
    def test_get_next_available_tag_avoid_tag(self, controller):
229
        """Test get next available tag avoiding a tag"""
230 1
        link = Link(self.iface1, self.iface2)
231 1
        avoid_vlan = 1
232
233
        # Tag different that avoid tag is available in the same range
234 1
        link.endpoint_a.available_tags['vlan'] = [[1, 5]]
235 1
        tag = link.get_next_available_tag(
236
            controller, "link_id", try_avoid_value=avoid_vlan
237
        )
238 1
        assert tag == 2
239
240
        # The next tag is the next range
241 1
        link.endpoint_a.available_tags['vlan'] = [[1, 1], [100, 300]]
242 1
        tag = link.get_next_available_tag(
243
            controller, "link_id", try_avoid_value=avoid_vlan
244
        )
245 1
        assert tag == 100
246
247
        # No more tags available
248 1
        link.endpoint_a.available_tags['vlan'] = [[1, 1]]
249 1
        tag = link.get_next_available_tag(
250
            controller, "link_id", try_avoid_value=avoid_vlan
251
        )
252 1
        assert tag == 1
253
254
        # Same cases but with reversed
255 1
        avoid_vlan = 50
256 1
        link.endpoint_a.available_tags['vlan'] = [[3, 50]]
257 1
        tag = link.get_next_available_tag(
258
            controller, "link_id", True, try_avoid_value=avoid_vlan
259
        )
260 1
        assert tag == 49
261 1
        link.endpoint_a.available_tags['vlan'] = [[1, 20], [50, 50]]
262 1
        tag = link.get_next_available_tag(
263
            controller, "link_id", True, try_avoid_value=avoid_vlan
264
        )
265 1
        assert tag == 20
266 1
        link.endpoint_a.available_tags['vlan'] = [[50, 50]]
267 1
        tag = link.get_next_available_tag(
268
            controller, "link_id", True, try_avoid_value=avoid_vlan
269
        )
270 1
        assert tag == 50
271
272 1
    def test_tag_life_cicle(self, controller):
273
        """Test get next available tags returns different tags"""
274 1
        link = Link(self.iface1, self.iface2)
275 1
        tag = link.get_next_available_tag(controller, "link_id")
276
277 1
        is_available = link.is_tag_available(tag)
278 1
        assert is_available is False
279
280 1
        link.make_tags_available(controller, tag, "link_id")
281 1
        is_available = link.is_tag_available(tag)
282 1
        assert is_available
283
284 1
    def test_concurrent_get_next_tag(self, controller):
285
        """Test get next available tags in concurrent execution"""
286
        # pylint: disable=import-outside-toplevel
287 1
        from tests.helper import test_concurrently
288 1
        _link = Link(self.iface1, self.iface2)
289
290 1
        _i = []
291 1
        available_tags = _link.endpoint_a.available_tags['vlan']
292 1
        _initial_size = 0
293 1
        for i, j in available_tags:
294 1
            _initial_size += j - i + 1
295
296 1
        @test_concurrently(20)
297 1
        def test_get_next_available_tag(controller):
298
            """Assert that get_next_available_tag() returns different tags."""
299 1
            _i.append(1)
300 1
            tag = _link.get_next_available_tag(controller, "link_id")
301 1
            time.sleep(0.0001)
302
303 1
            next_tag = _link.get_next_available_tag(controller, "link_id")
304
305 1
            assert tag != next_tag
306
307 1
        test_get_next_available_tag(controller)
308
309
        # sleep not needed because test_concurrently waits for all threads
310
        # to finish before returning.
311
        # time.sleep(0.1)
312
313
        # Check if after the 20th iteration we have 40 tags
314
        # It happens because we get 2 tags for every iteration
315 1
        available_tags = _link.endpoint_a.available_tags['vlan']
316 1
        _final_size = 0
317 1
        for i, j in available_tags:
318 1
            _final_size += j - i + 1
319 1
        assert _initial_size == _final_size + 40
320
321 1
    def test_available_vlans(self):
322
        """Test available_vlans method."""
323 1
        link = Link(self.iface1, self.iface2)
324 1
        link.endpoint_a.available_tags[TAGType.MPLS.value] = [[1, 100]]
325 1
        link.endpoint_b.available_tags[TAGType.MPLS.value] = [[50, 200]]
326
327 1
        vlans = link.available_vlans()
328 1
        assert vlans == [TAGType.VLAN.value]
329
330 1
    def test_get_available_vlans(self):
331
        """Test _get_available_vlans method."""
332 1
        link = Link(self.iface1, self.iface2)
333 1
        link.endpoint_a.available_tags[TAGType.VLAN_QINQ.value] = [[1, 1]]
334 1
        link.endpoint_a.available_tags[TAGType.MPLS.value] = [[1, 1]]
335 1
        vlans = link._get_available_vlans(link.endpoint_a)
336
        assert vlans == [TAGType.VLAN.value]
337