1 | """Utils module.""" |
||
2 | |||
3 | 1 | from functools import cache |
|
4 | |||
5 | 1 | from .settings import COOKIE_PREFIX |
|
6 | |||
7 | |||
8 | 1 | def int_dpid(dpid): |
|
9 | """Convert a str dpid to an int.""" |
||
10 | 1 | dpid = int(dpid.replace(":", ""), 16) |
|
11 | 1 | return dpid |
|
12 | |||
13 | |||
14 | 1 | def get_cookie(dpid): |
|
15 | """Return the cookie integer given a dpid.""" |
||
16 | 1 | return (0x0000FFFFFFFFFFFF & int(int_dpid(dpid))) | (COOKIE_PREFIX << 56) |
|
17 | |||
18 | |||
19 | 1 | @cache |
|
20 | 1 | def try_to_gen_intf_mac(address: str, dpid: str, port_number: int) -> str: |
|
21 | """Try to generate interface address if needed in a best effort way. |
||
22 | |||
23 | This is a sanity check to ensure that the source interface will |
||
24 | have a valid MAC, just so packets don't get potentially discarded. |
||
25 | """ |
||
26 | 1 | if all(( |
|
27 | not _is_default_mac(address), |
||
28 | not _has_mac_multicast_bit_set(address) |
||
29 | )): |
||
30 | 1 | return address |
|
31 | |||
32 | 1 | dpid_split = dpid.split(":") |
|
33 | 1 | if len(dpid_split) != 8: |
|
34 | 1 | return address |
|
35 | |||
36 | 1 | address = _gen_mac_address(dpid_split, port_number) |
|
37 | 1 | address = _make_unicast_local_mac(address) |
|
38 | 1 | return address |
|
39 | |||
40 | |||
41 | 1 | def _has_mac_multicast_bit_set(address: str) -> bool: |
|
42 | """Check whether it has the multicast bit set or not.""" |
||
43 | 1 | try: |
|
44 | 1 | return int(address[1], 16) & 1 |
|
45 | except (TypeError, ValueError, IndexError): |
||
46 | return False |
||
47 | |||
48 | |||
49 | 1 | def _is_default_mac(address: str) -> bool: |
|
50 | """Check whether is default mac or not.""" |
||
51 | 1 | return address == "00:00:00:00:00:00" |
|
52 | |||
53 | |||
54 | 1 | def _gen_mac_address(dpid_split: list[str], port_number: int) -> str: |
|
55 | """Generate a MAC address deriving from dpid lsb 40 bits. |
||
56 | A dpid is 8 bytes long: 16 bits + 48 bits. |
||
57 | """ |
||
58 | 1 | port_number = port_number % (1 << 8) |
|
59 | 1 | return ":".join(dpid_split[-5:] + [f"{port_number:02x}"]) |
|
60 | |||
61 | |||
62 | 1 | def _make_unicast_local_mac(address: str) -> str: |
|
63 | """ |
||
64 | Make an unicast locally administered address. |
||
65 | |||
66 | The first two bits (b0, b1) of the most significant MAC address byte is for |
||
67 | its uniqueness and wether its locally administered or not. This functions |
||
68 | ensures it's a unicast (b0 -> 0) and locally administered (b1 -> 1). |
||
69 | """ |
||
70 | 1 | return address[:1] + "e" + address[2:] |
|
71 | |||
72 | |||
73 | 1 | def update_flow(): |
|
74 | """Changed force value after a retry""" |
||
75 | 1 | def add_force(args): |
|
76 | 1 | args.kwargs['data']['force'] = True |
|
77 | return add_force |
||
78 |