Passed
Push — master ( b0712d...412579 )
by Vinicius
03:16 queued 12s
created

TestGenericFlow.test_from_flow_stats__x04()   A

Complexity

Conditions 1

Size

Total Lines 23
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 23
rs 9.55
c 0
b 0
f 0
cc 1
nop 1
1
"""Module to test the main napp file."""
2
import json
3
from unittest import TestCase
4
from unittest.mock import MagicMock, patch
5
from kytos.lib.helpers import (
6
    get_controller_mock,
7
    get_test_client,
8
    get_kytos_event_mock,
9
    get_switch_mock,
10
)
11
from napps.amlight.flow_stats.main import GenericFlow, Main
12
from napps.kytos.of_core.v0x04.flow import Action as Action40
13
from napps.kytos.of_core.v0x04.match_fields import MatchFieldFactory
14
from napps.kytos.of_core.v0x04.flow import Match as Match40
15
from pyof.foundation.basic_types import UBInt32
16
17
18
# pylint: disable=too-many-public-methods, too-many-lines
19
class TestMain(TestCase):
20
    """Test the Main class."""
21
22
    def setUp(self):
23
        """Execute steps before each tests.
24
25
        Set the server_name_url_url from amlight/flow_stats
26
        """
27
        self.server_name_url = "http://localhost:8181/api/amlight/flow_stats"
28
        self.napp = Main(get_controller_mock())
29
30
    @staticmethod
31
    def get_napp_urls(napp):
32
        """Return the amlight/flow_stats urls.
33
34
        The urls will be like:
35
36
        urls = [
37
            (options, methods, url)
38
        ]
39
40
        """
41
        controller = napp.controller
42
        controller.api_server.register_napp_endpoints(napp)
43
44
        urls = []
45
        for rule in controller.api_server.app.url_map.iter_rules():
46
            options = {}
47
            for arg in rule.arguments:
48
                options[arg] = f"[{0}]".format(arg)
49
50
            if f"{napp.username}/{napp.name}" in str(rule):
51
                urls.append((options, rule.methods, f"{str(rule)}"))
52
53
        return urls
54
55
    def test_verify_api_urls(self):
56
        """Verify all APIs registered."""
57
58
        expected_urls = [
59
            (
60
                {"dpid": "[dpid]"},
61
                {"OPTIONS", "HEAD", "GET"},
62
                "/api/amlight/flow_stats/flow/match/ ",
63
            ),
64
            (
65
                {"dpid": "[dpid]"},
66
                {"OPTIONS", "HEAD", "GET"},
67
                "/api/amlight/flow_stats/flow/stats/",
68
            ),
69
            (
70
                {"flow_id": "[flow_id]"},
71
                {"OPTIONS", "HEAD", "GET"},
72
                "/api/amlight/flow_stats/packet_count/",
73
            ),
74
            (
75
                {"flow_id": "[flow_id]"},
76
                {"OPTIONS", "HEAD", "GET"},
77
                "/api/amlight/flow_stats/bytes_count/",
78
            ),
79
            (
80
                {"dpid": "[dpid]"},
81
                {"OPTIONS", "HEAD", "GET"},
82
                "/api/amlight/flow_stats/packet_count/per_flow/",
83
            ),
84
            (
85
                {"dpid": "[dpid]"},
86
                {"OPTIONS", "HEAD", "GET"},
87
                "/api/amlight/flow_stats/packet_count/sum/",
88
            ),
89
            (
90
                {"dpid": "[dpid]"},
91
                {"OPTIONS", "HEAD", "GET"},
92
                "/api/amlight/flow_stats/bytes_count/per_flow/",
93
            ),
94
            (
95
                {"dpid": "[dpid]"},
96
                {"OPTIONS", "HEAD", "GET"},
97
                "/api/amlight/flow_stats/bytes_count/sum/",
98
            ),
99
        ]
100
        urls = self.get_napp_urls(self.napp)
101
        self.assertEqual(len(expected_urls), len(urls))
102
103
    def test_packet_count__fail(self):
104
        """Test packet_count rest call with wrong flow_id."""
105
        flow_id = "123456789"
106
        rest_name = "packet_count"
107
        response = self._get_rest_response(rest_name, flow_id)
108
109
        self.assertEqual(response.data, b"Flow does not exist")
110
111
    def test_packet_count(self):
112
        """Test packet_count rest call."""
113
        flow_id = "1"
114
        rest_name = "packet_count"
115
        self._patch_switch_flow(flow_id)
116
        response = self._get_rest_response(rest_name, flow_id)
117
118
        json_response = json.loads(response.data)
119
        self.assertEqual(json_response["flow_id"], flow_id)
120
        self.assertEqual(json_response["packet_counter"], 40)
121
        self.assertEqual(json_response["packet_per_second"], 2.0)
122
123
    def test_bytes_count_fail(self):
124
        """Test bytes_count rest call with wrong flow_id."""
125
        flow_id = "123456789"
126
        rest_name = "bytes_count"
127
        response = self._get_rest_response(rest_name, flow_id)
128
129
        self.assertEqual(response.data, b"Flow does not exist")
130
131
    def test_bytes_count(self):
132
        """Test bytes_count rest call."""
133
        flow_id = "1"
134
        rest_name = "bytes_count"
135
        self._patch_switch_flow(flow_id)
136
        response = self._get_rest_response(rest_name, flow_id)
137
138
        json_response = json.loads(response.data)
139
        self.assertEqual(json_response["flow_id"], flow_id)
140
        self.assertEqual(json_response["bytes_counter"], 10)
141
        self.assertEqual(json_response["bits_per_second"], 4.0)
142
143 View Code Duplication
    def test_packet_count_per_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
144
        """Test packet_count_per_flow rest call."""
145
        flow_id = "1"
146
        rest_name = "packet_count/per_flow"
147
        self._patch_switch_flow(flow_id)
148
149
        dpid_id = 111
150
        response = self._get_rest_response(rest_name, dpid_id)
151
152
        json_response = json.loads(response.data)
153
        self.assertEqual(json_response[0]["flow_id"], flow_id)
154
        self.assertEqual(json_response[0]["packet_counter"], 40)
155
        self.assertEqual(json_response[0]["packet_per_second"], 2.0)
156
157
    def test_packet_count_sum(self):
158
        """Test packet_count_sum rest call."""
159
        flow_id = "1"
160
        rest_name = "packet_count/sum"
161
        self._patch_switch_flow(flow_id)
162
163
        dpid_id = 111
164
        response = self._get_rest_response(rest_name, dpid_id)
165
        json_response = json.loads(response.data)
166
167
        self.assertEqual(json_response, 40)
168
169 View Code Duplication
    def test_bytes_count_per_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
170
        """Test bytes_count_per_flow rest call."""
171
        flow_id = "1"
172
        rest_name = "bytes_count/per_flow"
173
        self._patch_switch_flow(flow_id)
174
175
        dpid_id = 111
176
        response = self._get_rest_response(rest_name, dpid_id)
177
178
        json_response = json.loads(response.data)
179
        self.assertEqual(json_response[0]["flow_id"], flow_id)
180
        self.assertEqual(json_response[0]["bytes_counter"], 10)
181
        self.assertEqual(json_response[0]["bits_per_second"], 4.0)
182
183
    def test_bytes_count_sum(self):
184
        """Test bytes_count_sum rest call."""
185
        flow_id = "1"
186
        rest_name = "bytes_count/sum"
187
        self._patch_switch_flow(flow_id)
188
189
        dpid_id = 111
190
        response = self._get_rest_response(rest_name, dpid_id)
191
        json_response = json.loads(response.data)
192
193
        self.assertEqual(json_response, 10)
194
195
    @patch("napps.amlight.flow_stats.main.Main.match_flows")
196
    def test_flow_match(self, mock_match_flows):
197
        """Test flow_match rest call."""
198
        flow = GenericFlow()
199
        flow.actions = [
200
            Action40.from_dict(
201
                {
202
                    "action_type": "output",
203
                    "port": "1",
204
                }
205
            ),
206
        ]
207
        flow.version = "0x04"
208
        mock_match_flows.return_value = flow
209
210
        flow_id = "1"
211
        rest_name = "flow/match"
212
        self._patch_switch_flow(flow_id)
213
214
        dpid_id = "aa:00:00:00:00:00:00:11"
215
        response = self._get_rest_response(rest_name, dpid_id)
216
        json_response = json.loads(response.data)
217
218
        self.assertEqual(response.status_code, 200)
219
        self.assertEqual(json_response["actions"][0]["action_type"], "output")
220
        self.assertEqual(json_response["actions"][0]["port"], "1")
221
        self.assertEqual(json_response["version"], "0x04")
222
223
    @patch("napps.amlight.flow_stats.main.Main.match_flows")
224
    def test_flow_match_fail(self, mock_match_flows):
225
        """Test flow_match rest call."""
226
        mock_match_flows.return_value = None
227
228
        flow_id = "1"
229
        rest_name = "flow/match"
230
        self._patch_switch_flow(flow_id)
231
232
        dpid_id = "aa:00:00:00:00:00:00:11"
233
        response = self._get_rest_response(rest_name, dpid_id)
234
235
        self.assertEqual(response.status_code, 404)
236
237
    @patch("napps.amlight.flow_stats.main.Main.match_flows")
238
    def test_flow_stats(self, mock_match_flows):
239
        """Test flow_match rest call."""
240
        flow = GenericFlow()
241
        flow.actions = [
242
            Action40.from_dict(
243
                {
244
                    "action_type": "output",
245
                    "port": "1",
246
                }
247
            ),
248
        ]
249
        flow.version = "0x04"
250
        mock_match_flows.return_value = [flow]
251
252
        flow_id = "1"
253
        rest_name = "flow/stats"
254
        self._patch_switch_flow(flow_id)
255
256
        dpid_id = "aa:00:00:00:00:00:00:11"
257
        response = self._get_rest_response(rest_name, dpid_id)
258
        json_response = json.loads(response.data)
259
260
        self.assertEqual(response.status_code, 200)
261
        print(json_response)
262
        self.assertEqual(json_response[0]["actions"][0]["action_type"],
263
                         "output")
264
        self.assertEqual(json_response[0]["actions"][0]["port"], "1")
265
        self.assertEqual(json_response[0]["version"], "0x04")
266
267
    def _patch_switch_flow(self, flow_id):
268
        """Helper method to patch controller to return switch/flow data."""
269
        # patching the flow_stats object in the switch
270
        flow = self._get_mocked_flow_stats()
271
        flow.id = flow_id
272
        switch = MagicMock()
273
        switch.generic_flows = [flow]
274
        self.napp.controller.switches = {"1": switch}
275
        self.napp.controller.get_switch_by_dpid = MagicMock()
276
        self.napp.controller.get_switch_by_dpid.return_value = switch
277
278
    def _get_rest_response(self, rest_name, url_id):
279
        """Helper method to call a rest endpoint."""
280
        # call rest
281
        api = get_test_client(get_controller_mock(), self.napp)
282
        url = f"{self.server_name_url}/{rest_name}/{url_id}"
283
        response = api.get(url, content_type="application/json")
284
285
        return response
286
287
    def _get_mocked_flow_stats(self):
288
        """Helper method to create a mock flow_stats object."""
289
        flow_stats = MagicMock()
290
        flow_stats.id = 123
291
        flow_stats.byte_count = 10
292
        flow_stats.duration_sec = 20
293
        flow_stats.duration_nsec = 30
294
        flow_stats.packet_count = 40
295
        return flow_stats
296
297
    def _get_mocked_multipart_replies_flows(self):
298
        """Helper method to create mock multipart replies flows"""
299
        flow = self._get_mocked_flow_base()
300
301
        instruction = MagicMock()
302
        flow.instructions = [instruction]
303
304
        replies_flows = [flow]
305
        return replies_flows
306
307
    def _get_mocked_flow_base(self):
308
        """Helper method to create a mock flow object."""
309
        flow = MagicMock()
310
        flow.id = 456
311
        flow.switch = None
312
        flow.table_id = None
313
        flow.match = None
314
        flow.priority = None
315
        flow.idle_timeout = None
316
        flow.hard_timeout = None
317
        flow.cookie = None
318
        flow.stats = self._get_mocked_flow_stats()
319
        return flow
320
321
    @patch("napps.amlight.flow_stats.main.GenericFlow.from_flow_stats")
322
    def test_handle_stats_reply(self, mock_from_flow):
323
        """Test handle_stats_reply rest call."""
324
        mock_from_flow.return_value = self._get_mocked_flow_base()
325
326
        def side_effect(event):
327
            self.assertTrue(f"{event}", "amlight/flow_stats.flows_updated")
328
            self.assertTrue(event.content["switch"], 111)
329
330
        self.napp.controller = MagicMock()
331
        self.napp.controller.buffers.app.put.side_effect = side_effect
332
333
        msg = MagicMock()
334
        msg.flags.value = 2
335
        msg.body = [self._get_mocked_flow_stats()]
336
        event_switch = MagicMock()
337
        event_switch.generic_flows = []
338
        event_switch.dpid = 111
339
        self.napp.handle_stats_reply(msg, event_switch)
340
341
        # Check if important trace dont trigger the event
342
        # It means that the CP trace is the same to the DP trace
343
        self.napp.controller.buffers.app.put.assert_called_once()
344
345
        # Check mocked flow id
346
        self.assertEqual(event_switch.generic_flows[0].id, 456)
347
348
    @patch("napps.amlight.flow_stats.main.Main.handle_stats_reply_received")
349
    def test_handle_stats_received(self, mock_handle_stats):
350
        """Test handle_stats_received function."""
351
352
        switch_v0x04 = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
353
        replies_flows = self._get_mocked_multipart_replies_flows()
354
        name = "kytos/of_core.flow_stats.received"
355
        content = {"switch": switch_v0x04, "replies_flows": replies_flows}
356
357
        event = get_kytos_event_mock(name=name, content=content)
358
359
        self.napp.handle_stats_received(event)
360
        mock_handle_stats.assert_called_once()
361
362
    @patch("napps.amlight.flow_stats.main.Main.handle_stats_reply_received")
363
    def test_handle_stats_received_fail(self, mock_handle_stats):
364
        """Test handle_stats_received function for
365
        fail when replies_flows is not in content."""
366
367
        switch_v0x04 = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
368
        name = "kytos/of_core.flow_stats.received"
369
        content = {"switch": switch_v0x04}
370
371
        event = get_kytos_event_mock(name=name, content=content)
372
373
        self.napp.handle_stats_received(event)
374
        mock_handle_stats.assert_not_called()
375
376
    @patch("napps.amlight.flow_stats.main.GenericFlow.from_replies_flows")
377
    def test_handle_stats_reply_received(self, mock_from_flow):
378
        """Test handle_stats_reply_received call."""
379
        mock_from_flow.return_value = self._get_mocked_flow_base()
380
381
        event_switch = MagicMock()
382
        flows_mock = self._get_mocked_multipart_replies_flows()
383
        self.napp.handle_stats_reply_received(event_switch, flows_mock)
384
385
        self.assertEqual(event_switch.generic_flows[0].id, 456)
386
387
388
# pylint: disable=too-many-public-methods, too-many-lines
389
class TestGenericFlow(TestCase):
390
    """Test the GenericFlow class."""
391
392
    # pylint: disable=no-member
393
    def test_from_flow_stats__x04(self):
394
        """Test from_flow_stats method 0x04 version."""
395
        flow_stats = MagicMock()
396
397
        flow_stats.actions = [
398
            Action40.from_dict(
399
                {
400
                    "action_type": "output",
401
                    "port": UBInt32(1),
402
                }
403
            ).as_of_action(),
404
        ]
405
406
        result = GenericFlow.from_flow_stats(flow_stats)
407
408
        self.assertEqual(result.idle_timeout, flow_stats.idle_timeout.value)
409
        self.assertEqual(result.hard_timeout, flow_stats.hard_timeout.value)
410
        self.assertEqual(result.priority, flow_stats.priority.value)
411
        self.assertEqual(result.table_id, flow_stats.table_id.value)
412
        self.assertEqual(result.duration_sec, flow_stats.duration_sec.value)
413
        self.assertEqual(result.packet_count, flow_stats.packet_count.value)
414
        self.assertEqual(result.byte_count, flow_stats.byte_count.value)
415
        self.assertEqual(result.version, "0x04")
416
417
    def test_from_replies_flows(self):
418
        """Test from_replies_flows method 0x04 version."""
419
        replies_flow = MagicMock()
420
421
        action_dict = {
422
            "action_type": "output",
423
            "port": UBInt32(1),
424
        }
425
426
        actions = Action40.from_dict(action_dict).as_of_action()
427
428
        instruction = MagicMock()
429
        instruction.instruction_type = 'apply_actions'
430
        instruction.actions = [actions]
431
        replies_flow.instructions = [instruction]
432
        match = Match40(42)
433
        replies_flow.match = match
434
        result = GenericFlow.from_replies_flows(replies_flow)
435
436
        self.assertEqual(result.idle_timeout, replies_flow.idle_timeout)
437
        self.assertEqual(result.hard_timeout, replies_flow.hard_timeout)
438
        self.assertEqual(result.priority, replies_flow.priority)
439
        self.assertEqual(result.table_id, replies_flow.table_id)
440
        self.assertEqual(result.cookie, replies_flow.cookie)
441
        self.assertEqual(result.duration_sec, replies_flow.stats.duration_sec)
442
        self.assertEqual(result.packet_count, replies_flow.stats.packet_count)
443
        self.assertEqual(result.byte_count, replies_flow.stats.byte_count)
444
445
        # pylint: disable=too-many-public-methods, too-many-lines
446
        match_expect = MatchFieldFactory.from_of_tlv(
447
            match.as_of_match().oxm_match_fields[0]
448
            )
449
        self.assertEqual(result.actions[0], actions)
450
        self.assertEqual(result.match["in_port"], match_expect)
451
452
    def test_to_dict__x04(self):
453
        """Test to_dict method 0x04 version."""
454
        match = {}
455
        match["in_port"] = MagicMock()
456
        match["in_port"].value = 22
457
458
        generic_flow = GenericFlow(
459
            version="0x04",
460
            match=match,
461
        )
462
463
        result = generic_flow.to_dict()
464
        expected = {
465
            "version": "0x04",
466
            "in_port": 22,
467
            "idle_timeout": 0,
468
            "hard_timeout": 0,
469
            "priority": 0,
470
            "table_id": 255,
471
            "cookie": None,
472
            "buffer_id": None,
473
            "actions": [],
474
        }
475
476
        self.assertEqual(result, expected)
477
478
    def test_match_to_dict(self):
479
        """Test match_to_dict method for 0x04 version."""
480
        match = {}
481
        match["in_port"] = MagicMock()
482
        match["in_port"].value = 22
483
        match["vlan_vid"] = MagicMock()
484
        match["vlan_vid"].value = 123
485
486
        generic_flow = GenericFlow(
487
            version="0x04",
488
            match=match,
489
        )
490
        result = generic_flow.match_to_dict()
491
        expected = {"in_port": 22, "vlan_vid": 123}
492
493
        self.assertEqual(result, expected)
494
495
    def test_match_to_dict__empty_match(self):
496
        """Test match_to_dict method for 0x04 version, with empty matches."""
497
        generic_flow = GenericFlow(version="0x04")
498
        result = generic_flow.match_to_dict()
499
        self.assertEqual(result, {})
500
501
    def test_id__x04(self):
502
        """Test id method 0x04 version."""
503
        match = {}
504
        match["in_port"] = MagicMock()
505
        match["in_port"].value = 22
506
        match["vlan_vid"] = MagicMock()
507
        match["vlan_vid"].value = 123
508
509
        generic_flow = GenericFlow(
510
            version="0x04",
511
            match=match,
512
            idle_timeout=1,
513
            hard_timeout=2,
514
            priority=6,
515
            table_id=7,
516
            cookie=8,
517
            buffer_id=9,
518
        )
519
520
        self.assertEqual(generic_flow.id, "2d843f76b8b254fad6c6e2a114590440")
521