1
|
|
|
"""Flow stats message.""" |
2
|
|
|
from pyof.v0x04.common.action import ActionOutput, ListOfActions |
3
|
|
|
from pyof.v0x04.common.flow_instructions import ( |
4
|
|
|
InstructionApplyAction, ListOfInstruction) |
5
|
|
|
from pyof.v0x04.common.flow_match import ( |
6
|
|
|
Match, MatchType, OxmClass, OxmOfbMatchField, OxmTLV) |
7
|
|
|
from pyof.v0x04.common.port import PortNo |
8
|
|
|
from pyof.v0x04.controller2switch.common import MultipartType |
9
|
|
|
from pyof.v0x04.controller2switch.multipart_reply import ( |
10
|
|
|
FlowStats, MultipartReply) |
11
|
|
|
from tests.test_struct import TestStruct |
12
|
|
|
|
13
|
|
|
|
14
|
|
|
class TestFlowStats(TestStruct): |
15
|
|
|
"""Flow stats message.""" |
16
|
|
|
|
17
|
|
|
@classmethod |
18
|
|
|
def setUpClass(cls): |
19
|
|
|
"""Configure raw file and its object in parent class (TestDump).""" |
20
|
|
|
super().setUpClass() |
21
|
|
|
super().set_raw_dump_file('v0x04', 'ofpt_flow_stats') |
22
|
|
|
super().set_raw_dump_object(MultipartReply, xid=2898845528, |
23
|
|
|
multipart_type=MultipartType.OFPMP_FLOW, |
24
|
|
|
flags=0, |
25
|
|
|
body=_get_body()) |
26
|
|
|
super().set_minimum_size(16) |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
def _get_body(): |
30
|
|
|
"""Return the body used by MultipartReply message.""" |
31
|
|
|
return FlowStats(length=88, table_id=0, duration_sec=56, |
32
|
|
|
duration_nsec=635000000, priority=1000, idle_timeout=0, |
33
|
|
|
hard_timeout=0, flags=0x00000001, |
34
|
|
|
cookie=0x0000000000000000, packet_count=18, |
35
|
|
|
byte_count=756, match=_new_match(), |
36
|
|
|
instructions=_new_list_of_instructions()) |
37
|
|
|
|
38
|
|
|
|
39
|
|
View Code Duplication |
def _new_match(): |
|
|
|
|
40
|
|
|
"""Crate new Match instance.""" |
41
|
|
|
oxmtlv1 = OxmTLV(oxm_class=OxmClass.OFPXMC_OPENFLOW_BASIC, |
42
|
|
|
oxm_field=OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE, |
43
|
|
|
oxm_hasmask=False, oxm_value=b'\x88\xcc') |
44
|
|
|
oxmtlv2 = OxmTLV(oxm_class=OxmClass.OFPXMC_OPENFLOW_BASIC, |
45
|
|
|
oxm_field=OxmOfbMatchField.OFPXMT_OFB_VLAN_VID, |
46
|
|
|
oxm_hasmask=False, oxm_value=b'\x1e\xd7') |
47
|
|
|
return Match(match_type=MatchType.OFPMT_OXM, |
48
|
|
|
oxm_match_fields=[oxmtlv1, oxmtlv2]) |
49
|
|
|
|
50
|
|
|
|
51
|
|
|
def _new_list_of_instructions(): |
52
|
|
|
"""Crate new ListOfInstruction.""" |
53
|
|
|
action_output = ActionOutput(port=PortNo.OFPP_CONTROLLER) |
54
|
|
|
loa = ListOfActions([action_output]) |
55
|
|
|
instruction = InstructionApplyAction(loa) |
56
|
|
|
return ListOfInstruction([instruction]) |
57
|
|
|
|