Passed
Pull Request — master (#22)
by Rogerio
03:00
created

TestMain.test_handle_stats_reply()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 27
rs 9.5
c 0
b 0
f 0
cc 1
nop 2
1
"""Module to test the main napp file."""
2
import json
3
from unittest import TestCase
4
from unittest.mock import MagicMock, patch
5
import time
6
7
from kytos.lib.helpers import (
8
    get_controller_mock,
9
    get_test_client,
10
    get_kytos_event_mock,
11
    get_switch_mock,
12
)
13
from napps.amlight.flow_stats.main import GenericFlow, Main
14
from napps.kytos.of_core.v0x01.flow import Action as Action10
15
from napps.kytos.of_core.v0x04.flow import Action as Action40
16
from napps.kytos.of_core.v0x04.match_fields import (
17
    MatchDLVLAN,
18
    MatchFieldFactory,
19
)
20
from pyof.foundation.basic_types import UBInt32
21
from pyof.v0x04.common.flow_instructions import InstructionType
22
from pyof.v0x01.controller2switch.common import StatsType
23
from pyof.v0x04.controller2switch.common import MultipartType
24
25
26
# pylint: disable=too-many-public-methods, too-many-lines
27
class TestMain(TestCase):
28
    """Test the Main class."""
29
30
    def setUp(self):
31
        """Execute steps before each tests.
32
33
        Set the server_name_url_url from amlight/flow_stats
34
        """
35
        self.server_name_url = "http://localhost:8181/api/amlight/flow_stats"
36
37
        # The decorator run_on_thread is patched, so methods that listen
38
        # for events do not run on threads while tested.
39
        # Decorators have to be patched before the methods that are
40
        # decorated with them are imported.
41
        patch("kytos.core.helpers.run_on_thread", lambda x: x).start()
42
43
        self.napp = Main(get_controller_mock())
44
45
    @staticmethod
46
    def get_napp_urls(napp):
47
        """Return the amlight/flow_stats urls.
48
49
        The urls will be like:
50
51
        urls = [
52
            (options, methods, url)
53
        ]
54
55
        """
56
        controller = napp.controller
57
        controller.api_server.register_napp_endpoints(napp)
58
59
        urls = []
60
        for rule in controller.api_server.app.url_map.iter_rules():
61
            options = {}
62
            for arg in rule.arguments:
63
                options[arg] = f"[{0}]".format(arg)
64
65
            if f"{napp.username}/{napp.name}" in str(rule):
66
                urls.append((options, rule.methods, f"{str(rule)}"))
67
68
        return urls
69
70
    def test_verify_api_urls(self):
71
        """Verify all APIs registered."""
72
73
        expected_urls = [
74
            (
75
                {"dpid": "[dpid]"},
76
                {"OPTIONS", "HEAD", "GET"},
77
                "/api/amlight/flow_stats/flow/match/ ",
78
            ),
79
            (
80
                {"dpid": "[dpid]"},
81
                {"OPTIONS", "HEAD", "GET"},
82
                "/api/amlight/flow_stats/flow/stats/",
83
            ),
84
            (
85
                {"flow_id": "[flow_id]"},
86
                {"OPTIONS", "HEAD", "GET"},
87
                "/api/amlight/flow_stats/packet_count/",
88
            ),
89
            (
90
                {"flow_id": "[flow_id]"},
91
                {"OPTIONS", "HEAD", "GET"},
92
                "/api/amlight/flow_stats/bytes_count/",
93
            ),
94
            (
95
                {"dpid": "[dpid]"},
96
                {"OPTIONS", "HEAD", "GET"},
97
                "/api/amlight/flow_stats/packet_count/per_flow/",
98
            ),
99
            (
100
                {"dpid": "[dpid]"},
101
                {"OPTIONS", "HEAD", "GET"},
102
                "/api/amlight/flow_stats/packet_count/sum/",
103
            ),
104
            (
105
                {"dpid": "[dpid]"},
106
                {"OPTIONS", "HEAD", "GET"},
107
                "/api/amlight/flow_stats/bytes_count/per_flow/",
108
            ),
109
            (
110
                {"dpid": "[dpid]"},
111
                {"OPTIONS", "HEAD", "GET"},
112
                "/api/amlight/flow_stats/bytes_count/sum/",
113
            ),
114
        ]
115
        urls = self.get_napp_urls(self.napp)
116
        self.assertEqual(len(expected_urls), len(urls))
117
118
    def test_packet_count__fail(self):
119
        """Test packet_count rest call with wrong flow_id."""
120
        flow_id = "123456789"
121
        rest_name = "packet_count"
122
        response = self._get_rest_response(rest_name, flow_id)
123
124
        self.assertEqual(response.data, b"Flow does not exist")
125
126
    def test_packet_count(self):
127
        """Test packet_count rest call."""
128
        flow_id = "1"
129
        rest_name = "packet_count"
130
        self._patch_switch_flow(flow_id)
131
        response = self._get_rest_response(rest_name, flow_id)
132
133
        json_response = json.loads(response.data)
134
        self.assertEqual(json_response["flow_id"], flow_id)
135
        self.assertEqual(json_response["packet_counter"], 40)
136
        self.assertEqual(json_response["packet_per_second"], 2.0)
137
138
    def test_bytes_count_fail(self):
139
        """Test bytes_count rest call with wrong flow_id."""
140
        flow_id = "123456789"
141
        rest_name = "bytes_count"
142
        response = self._get_rest_response(rest_name, flow_id)
143
144
        self.assertEqual(response.data, b"Flow does not exist")
145
146
    def test_bytes_count(self):
147
        """Test bytes_count rest call."""
148
        flow_id = "1"
149
        rest_name = "bytes_count"
150
        self._patch_switch_flow(flow_id)
151
        response = self._get_rest_response(rest_name, flow_id)
152
153
        json_response = json.loads(response.data)
154
        self.assertEqual(json_response["flow_id"], flow_id)
155
        self.assertEqual(json_response["bytes_counter"], 10)
156
        self.assertEqual(json_response["bits_per_second"], 4.0)
157
158 View Code Duplication
    def test_packet_count_per_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        """Test packet_count_per_flow rest call."""
160
        flow_id = "1"
161
        rest_name = "packet_count/per_flow"
162
        self._patch_switch_flow(flow_id)
163
164
        dpid_id = 111
165
        response = self._get_rest_response(rest_name, dpid_id)
166
167
        json_response = json.loads(response.data)
168
        self.assertEqual(json_response[0]["flow_id"], flow_id)
169
        self.assertEqual(json_response[0]["packet_counter"], 40)
170
        self.assertEqual(json_response[0]["packet_per_second"], 2.0)
171
172
    def test_packet_count_sum(self):
173
        """Test packet_count_sum rest call."""
174
        flow_id = "1"
175
        rest_name = "packet_count/sum"
176
        self._patch_switch_flow(flow_id)
177
178
        dpid_id = 111
179
        response = self._get_rest_response(rest_name, dpid_id)
180
        json_response = json.loads(response.data)
181
182
        self.assertEqual(json_response, 40)
183
184 View Code Duplication
    def test_bytes_count_per_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
185
        """Test bytes_count_per_flow rest call."""
186
        flow_id = "1"
187
        rest_name = "bytes_count/per_flow"
188
        self._patch_switch_flow(flow_id)
189
190
        dpid_id = 111
191
        response = self._get_rest_response(rest_name, dpid_id)
192
193
        json_response = json.loads(response.data)
194
        self.assertEqual(json_response[0]["flow_id"], flow_id)
195
        self.assertEqual(json_response[0]["bytes_counter"], 10)
196
        self.assertEqual(json_response[0]["bits_per_second"], 4.0)
197
198
    def test_bytes_count_sum(self):
199
        """Test bytes_count_sum rest call."""
200
        flow_id = "1"
201
        rest_name = "bytes_count/sum"
202
        self._patch_switch_flow(flow_id)
203
204
        dpid_id = 111
205
        response = self._get_rest_response(rest_name, dpid_id)
206
        json_response = json.loads(response.data)
207
208
        self.assertEqual(json_response, 10)
209
210
    @patch("napps.amlight.flow_stats.main.Main.match_flows")
211
    def test_flow_match(self, mock_match_flows):
212
        """Test flow_match rest call."""
213
        flow = GenericFlow()
214
        flow.actions = [
215
            Action10.from_dict(
216
                {
217
                    "action_type": "output",
218
                    "port": "1",
219
                }
220
            ),
221
        ]
222
        flow.version = "0x04"
223
        mock_match_flows.return_value = flow
224
225
        flow_id = "1"
226
        rest_name = "flow/match"
227
        self._patch_switch_flow(flow_id)
228
229
        dpid_id = "aa:00:00:00:00:00:00:11"
230
        response = self._get_rest_response(rest_name, dpid_id)
231
        json_response = json.loads(response.data)
232
233
        self.assertEqual(response.status_code, 200)
234
        self.assertEqual(json_response["actions"][0]["action_type"], "output")
235
        self.assertEqual(json_response["actions"][0]["port"], "1")
236
        self.assertEqual(json_response["version"], "0x04")
237
238
    @patch("napps.amlight.flow_stats.main.Main.match_flows")
239
    def test_flow_match_fail(self, mock_match_flows):
240
        """Test flow_match rest call."""
241
        mock_match_flows.return_value = None
242
243
        flow_id = "1"
244
        rest_name = "flow/match"
245
        self._patch_switch_flow(flow_id)
246
247
        dpid_id = "aa:00:00:00:00:00:00:11"
248
        response = self._get_rest_response(rest_name, dpid_id)
249
250
        self.assertEqual(response.status_code, 404)
251
252
    @patch("napps.amlight.flow_stats.main.Main.match_flows")
253
    def test_flow_stats(self, mock_match_flows):
254
        """Test flow_match rest call."""
255
        flow = GenericFlow()
256
        flow.actions = [
257
            Action10.from_dict(
258
                {
259
                    "action_type": "output",
260
                    "port": "1",
261
                }
262
            ),
263
        ]
264
        flow.version = "0x04"
265
        mock_match_flows.return_value = [flow]
266
267
        flow_id = "1"
268
        rest_name = "flow/stats"
269
        self._patch_switch_flow(flow_id)
270
271
        dpid_id = "aa:00:00:00:00:00:00:11"
272
        response = self._get_rest_response(rest_name, dpid_id)
273
        json_response = json.loads(response.data)
274
275
        self.assertEqual(response.status_code, 200)
276
        print(json_response)
277
        self.assertEqual(json_response[0]["actions"][0]["action_type"],
278
                         "output")
279
        self.assertEqual(json_response[0]["actions"][0]["port"], "1")
280
        self.assertEqual(json_response[0]["version"], "0x04")
281
282
    def _patch_switch_flow(self, flow_id):
283
        """Helper method to patch controller to return switch/flow data."""
284
        # patching the flow_stats object in the switch
285
        self.napp.controller.switches = MagicMock()
286
        flow = self._get_mocked_flow_stats()
287
        flow.id = flow_id
288
        switch = MagicMock()
289
        switch.generic_flows = [flow]
290
        self.napp.controller.switches.values.return_value = [switch]
291
        self.napp.controller.get_switch_by_dpid = MagicMock()
292
        self.napp.controller.get_switch_by_dpid.return_value = switch
293
294
    def _get_rest_response(self, rest_name, url_id):
295
        """Helper method to call a rest endpoint."""
296
        # call rest
297
        api = get_test_client(get_controller_mock(), self.napp)
298
        url = f"{self.server_name_url}/{rest_name}/{url_id}"
299
        response = api.get(url, content_type="application/json")
300
301
        return response
302
303
    # pylint: disable=no-self-use
304
    def _get_mocked_flow_stats(self):
305
        """Helper method to create a mock flow_stats object."""
306
        flow_stats = MagicMock()
307
        flow_stats.id = 123
308
        flow_stats.byte_count = 10
309
        flow_stats.duration_sec = 20
310
        flow_stats.duration_nsec = 30
311
        flow_stats.packet_count = 40
312
        return flow_stats
313
314
    def _get_mocked_flow_base(self):
315
        """Helper method to create a mock flow object."""
316
        flow = MagicMock()
317
        flow.id = 456
318
        flow.switch = None
319
        flow.table_id = None
320
        flow.match = None
321
        flow.priority = None
322
        flow.idle_timeout = None
323
        flow.hard_timeout = None
324
        flow.cookie = None
325
        flow.stats = self._get_mocked_flow_stats()
326
        return flow
327
328
    @patch("napps.amlight.flow_stats.main.GenericFlow.from_flow_stats")
329
    def test_handle_stats_reply(self, mock_from_flow):
330
        """Test handle_stats_reply rest call."""
331
        mock_from_flow.return_value = self._get_mocked_flow_base()
332
333
        def side_effect(event):
334
            self.assertTrue(f"{event}", "amlight/flow_stats.flows_updated")
335
            self.assertTrue(event.content["switch"], 111)
336
337
        self.napp.controller = MagicMock()
338
        self.napp.controller.buffers.app.put.side_effect = side_effect
339
340
        msg = MagicMock()
341
        msg.flags.value = 2
342
        msg.body = [self._get_mocked_flow_stats()]
343
        event_switch = MagicMock()
344
        event_switch.generic_flows = []
345
        event_switch.dpid = 111
346
        self.napp.handle_stats_reply(msg, event_switch)
347
        time.sleep(0.5)
348
349
        # Check if important trace dont trigger the event
350
        # It means that the CP trace is the same to the DP trace
351
        self.napp.controller.buffers.app.put.assert_called_once()
352
353
        # Check mocked flow id
354
        self.assertEqual(event_switch.generic_flows[0].id, 456)
355
356
    @patch("napps.amlight.flow_stats.main.Main.handle_stats_reply")
357
    def test_handle_stats_reply_0x01(self, mock_handle_stats):
358
        """Test handle stats reply."""
359
        flow_msg = MagicMock()
360
        flow_msg.body = "A"
361
        flow_msg.body_type = StatsType.OFPST_FLOW
362
363
        switch_v0x01 = get_switch_mock("00:00:00:00:00:00:00:01", 0x01)
364
365
        name = "kytos/of_core.v0x01.messages.in.ofpt_stats_reply"
366
        content = {"source": switch_v0x01.connection, "message": flow_msg}
367
        event = get_kytos_event_mock(name=name, content=content)
368
        self.napp.handle_stats_reply_0x01(event)
369
        # Event listener is async. Just waiting for the Mock to return
370
        time.sleep(0.5)
371
        mock_handle_stats.assert_called_once()
372
373
    @patch("napps.amlight.flow_stats.main.Main.handle_stats_reply")
374
    def test_handle_stats_reply_0x01__fail(self, mock_handle_stats):
375
        """Test handle stats reply."""
376
        flow_msg = MagicMock()
377
        flow_msg.body = "A"
378
        flow_msg.body_type = StatsType.OFPST_DESC
379
380
        switch_v0x01 = get_switch_mock("00:00:00:00:00:00:00:01", 0x01)
381
382
        name = "kytos/of_core.v0x01.messages.in.ofpt_stats_reply"
383
        content = {"source": switch_v0x01.connection, "message": flow_msg}
384
        event = get_kytos_event_mock(name=name, content=content)
385
        self.napp.handle_stats_reply_0x01(event)
386
        # Event listener is async. Just waiting for the Mock to return
387
        time.sleep(0.5)
388
        mock_handle_stats.assert_not_called()
389
390
    @patch("napps.amlight.flow_stats.main.Main.handle_stats_reply")
391
    def test_handle_stats_reply_0x04(self, mock_handle_stats):
392
        """Test handle stats reply."""
393
        flow_msg = MagicMock()
394
        flow_msg.body = "A"
395
        flow_msg.multipart_type = MultipartType.OFPMP_FLOW
396
397
        switch_v0x04 = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
398
399
        name = "kytos/of_core.v0x04.messages.in.ofpt_multipart_reply"
400
        content = {"source": switch_v0x04.connection, "message": flow_msg}
401
        event = get_kytos_event_mock(name=name, content=content)
402
        event.content["message"] = flow_msg
403
404
        self.napp.handle_stats_reply_0x04(event)
405
        # Event listener is async. Just waiting for the Mock to return
406
        time.sleep(0.5)
407
408
        mock_handle_stats.assert_called_once()
409
410
    @patch("napps.amlight.flow_stats.main.Main.handle_stats_reply")
411
    def test_handle_stats_reply_0x04_fail(self, mock_handle_stats):
412
        """Test handle stats reply."""
413
        flow_msg = MagicMock()
414
        flow_msg.body = "A"
415
416
        flow_msg.multipart_type = MultipartType.OFPMP_PORT_DESC
417
418
        switch_v0x04 = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
419
420
        name = "kytos/of_core.v0x04.messages.in.ofpt_multipart_reply"
421
        content = {"source": switch_v0x04.connection, "message": flow_msg}
422
423
        event = get_kytos_event_mock(name=name, content=content)
424
        event.content["message"] = flow_msg
425
426
        self.napp.handle_stats_reply_0x04(event)
427
        # Event listener is async. Just waiting for the Mock to return
428
        time.sleep(0.5)
429
430
        mock_handle_stats.assert_not_called()
431
432
433
# pylint: disable=too-many-public-methods, too-many-lines
434
class TestGenericFlow(TestCase):
435
    """Test the GenericFlow class."""
436
437
    # pylint: disable=no-member
438
    def test_from_flow_stats__x01(self):
439
        """Test from_flow_stats method 0x01 version."""
440
        flow_stats = MagicMock()
441
442
        flow_stats.actions = [
443
            Action10.from_dict(
444
                {
445
                    "action_type": "output",
446
                    "port": UBInt32(1),
447
                }
448
            ).as_of_action(),
449
        ]
450
451
        result = GenericFlow.from_flow_stats(flow_stats, version="0x01")
452
453
        self.assertEqual(result.idle_timeout, flow_stats.idle_timeout.value)
454
        self.assertEqual(result.hard_timeout, flow_stats.hard_timeout.value)
455
        self.assertEqual(result.priority, flow_stats.priority.value)
456
        self.assertEqual(result.table_id, flow_stats.table_id.value)
457
        self.assertEqual(result.duration_sec, flow_stats.duration_sec.value)
458
        self.assertEqual(result.packet_count, flow_stats.packet_count.value)
459
        self.assertEqual(result.byte_count, flow_stats.byte_count.value)
460
461
        exp_match = flow_stats.match
462
        self.assertEqual(result.match["wildcards"], exp_match.wildcards.value)
463
        self.assertEqual(result.match["in_port"], exp_match.in_port.value)
464
        self.assertEqual(result.match["eth_src"], exp_match.dl_src.value)
465
        self.assertEqual(result.match["eth_dst"], exp_match.dl_dst.value)
466
        self.assertEqual(result.match["vlan_vid"], exp_match.dl_vlan.value)
467
        self.assertEqual(result.match["vlan_pcp"], exp_match.dl_vlan_pcp.value)
468
        self.assertEqual(result.match["eth_type"], exp_match.dl_type.value)
469
        self.assertEqual(result.match["ip_tos"], exp_match.nw_tos.value)
470
        self.assertEqual(result.match["ipv4_src"], exp_match.nw_src.value)
471
        self.assertEqual(result.match["ipv4_dst"], exp_match.nw_dst.value)
472
        self.assertEqual(result.match["ip_proto"], exp_match.nw_proto.value)
473
        self.assertEqual(result.match["tcp_src"], exp_match.tp_src.value)
474
        self.assertEqual(result.match["tcp_dst"], exp_match.tp_dst.value)
475
476
    def test_from_flow_stats__x04_match(self):
477
        """Test from_flow_stats method 0x04 version with match."""
478
        flow_stats = MagicMock()
479
        flow_stats.actions = [
480
            Action10.from_dict(
481
                {
482
                    "action_type": "output",
483
                    "port": UBInt32(1),
484
                }
485
            ).as_of_action(),
486
        ]
487
488
        flow_stats.match.oxm_match_fields = [MatchDLVLAN(42).as_of_tlv()]
489
490
        result = GenericFlow.from_flow_stats(flow_stats, version="0x04")
491
492
        match_expect = MatchFieldFactory.from_of_tlv(
493
            flow_stats.match.oxm_match_fields[0]
494
        )
495
        self.assertEqual(result.match["vlan_vid"], match_expect)
496
497
    def test_from_flow_stats__x04_action(self):
498
        """Test from_flow_stats method 0x04 version with action."""
499
        flow_stats = MagicMock()
500
501
        action_dict = {
502
            "action_type": "output",
503
            "port": UBInt32(1),
504
        }
505
        instruction = MagicMock()
506
        instruction.instruction_type = InstructionType.OFPIT_APPLY_ACTIONS
507
        instruction.actions = [Action40.from_dict(action_dict).as_of_action()]
508
        flow_stats.instructions = [instruction]
509
510
        result = GenericFlow.from_flow_stats(flow_stats, version="0x04")
511
        self.assertEqual(result.actions[0].as_dict(), action_dict)
512
513
    def test_to_dict__x01(self):
514
        """Test to_dict method 0x01 version."""
515
        action = Action10.from_dict(
516
            {
517
                "action_type": "set_vlan",
518
                "vlan_id": 6,
519
            }
520
        )
521
        match = {}
522
        match["in_port"] = 11
523
524
        generic_flow = GenericFlow(
525
            version="0x01",
526
            match=match,
527
            idle_timeout=1,
528
            hard_timeout=2,
529
            duration_sec=3,
530
            packet_count=4,
531
            byte_count=5,
532
            priority=6,
533
            table_id=7,
534
            cookie=8,
535
            buffer_id=9,
536
            actions=[action],
537
        )
538
539
        result = generic_flow.to_dict()
540
541
        expected = {
542
            "version": "0x01",
543
            "idle_timeout": 1,
544
            "in_port": 11,
545
            "hard_timeout": 2,
546
            "priority": 6,
547
            "table_id": 7,
548
            "cookie": 8,
549
            "buffer_id": 9,
550
            "actions": [{"vlan_id": 6, "action_type": "set_vlan"}],
551
        }
552
553
        self.assertEqual(result, expected)
554
555
    def test_to_dict__x04(self):
556
        """Test to_dict method 0x04 version."""
557
        match = {}
558
        match["in_port"] = MagicMock()
559
        match["in_port"].value = 22
560
561
        generic_flow = GenericFlow(
562
            version="0x04",
563
            match=match,
564
        )
565
566
        result = generic_flow.to_dict()
567
        expected = {
568
            "version": "0x04",
569
            "in_port": 22,
570
            "idle_timeout": 0,
571
            "hard_timeout": 0,
572
            "priority": 0,
573
            "table_id": 255,
574
            "cookie": None,
575
            "buffer_id": None,
576
            "actions": [],
577
        }
578
579
        self.assertEqual(result, expected)
580
581
    def test_match_to_dict(self):
582
        """Test match_to_dict method for 0x04 version."""
583
        match = {}
584
        match["in_port"] = MagicMock()
585
        match["in_port"].value = 22
586
        match["vlan_vid"] = MagicMock()
587
        match["vlan_vid"].value = 123
588
589
        generic_flow = GenericFlow(
590
            version="0x04",
591
            match=match,
592
        )
593
        result = generic_flow.match_to_dict()
594
        expected = {"in_port": 22, "vlan_vid": 123}
595
596
        self.assertEqual(result, expected)
597
598
    def test_match_to_dict__empty_match(self):
599
        """Test match_to_dict method for 0x04 version, with empty matches."""
600
        generic_flow = GenericFlow(version="0x04")
601
        result = generic_flow.match_to_dict()
602
        self.assertEqual(result, {})
603
604
    def test_id__x01(self):
605
        """Test id method 0x01 version."""
606
        action = Action10.from_dict(
607
            {
608
                "action_type": "set_vlan",
609
                "vlan_id": 6,
610
            }
611
        )
612
        match = {}
613
        match["in_port"] = 11
614
615
        generic_flow = GenericFlow(
616
            version="0x01",
617
            match=match,
618
            idle_timeout=1,
619
            hard_timeout=2,
620
            priority=6,
621
            table_id=7,
622
            cookie=8,
623
            buffer_id=9,
624
            actions=[action],
625
        )
626
627
        self.assertEqual(generic_flow.id, "78d18f2cffd3eaa069afbf0995b90db9")
628
629
    def test_id__x04(self):
630
        """Test id method 0x04 version."""
631
        match = {}
632
        match["in_port"] = MagicMock()
633
        match["in_port"].value = 22
634
        match["vlan_vid"] = MagicMock()
635
        match["vlan_vid"].value = 123
636
637
        generic_flow = GenericFlow(
638
            version="0x04",
639
            match=match,
640
            idle_timeout=1,
641
            hard_timeout=2,
642
            priority=6,
643
            table_id=7,
644
            cookie=8,
645
            buffer_id=9,
646
        )
647
648
        self.assertEqual(generic_flow.id, "2d843f76b8b254fad6c6e2a114590440")
649