| @@ 436-511 (lines=76) @@ | ||
| 433 | } |
|
| 434 | self.assertEqual(expected_flow_mod, flow_mod) |
|
| 435 | ||
| 436 | @staticmethod |
|
| 437 | @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods') |
|
| 438 | def test_install_uni_flows(send_flow_mods_mock): |
|
| 439 | """Test install uni flows method. |
|
| 440 | ||
| 441 | This test will verify the flows send to the send_flows_mods method. |
|
| 442 | """ |
|
| 443 | uni_a = get_uni_mocked(interface_port=2, tag_value=82, |
|
| 444 | switch_id="switch_uni_a", is_valid=True) |
|
| 445 | uni_z = get_uni_mocked(interface_port=3, tag_value=83, |
|
| 446 | switch_id="switch_uni_z", is_valid=True) |
|
| 447 | ||
| 448 | attributes = { |
|
| 449 | "name": "custom_name", |
|
| 450 | "uni_a": uni_a, |
|
| 451 | "uni_z": uni_z, |
|
| 452 | "primary_links": [ |
|
| 453 | get_link_mocked(endpoint_a_port=9, endpoint_b_port=10, |
|
| 454 | metadata={"s_vlan": 5}), |
|
| 455 | get_link_mocked(endpoint_a_port=11, endpoint_b_port=12, |
|
| 456 | metadata={"s_vlan": 6}) |
|
| 457 | ] |
|
| 458 | } |
|
| 459 | evc = EVC(**attributes) |
|
| 460 | evc.install_uni_flows(attributes['primary_links']) |
|
| 461 | ||
| 462 | expected_flow_mod_a = [ |
|
| 463 | {'match': {'in_port': uni_a.interface.port_number, |
|
| 464 | 'dl_vlan': uni_a.user_tag.value}, |
|
| 465 | 'actions': [ |
|
| 466 | {'action_type': 'set_vlan', 'vlan_id': uni_z.user_tag.value}, |
|
| 467 | {'action_type': 'push_vlan', 'tag_type': 's'}, |
|
| 468 | {'action_type': 'set_vlan', |
|
| 469 | 'vlan_id': evc.primary_links[0].get_metadata('s_vlan').value}, |
|
| 470 | {'action_type': 'output', |
|
| 471 | 'port': evc.primary_links[0].endpoint_a.port_number} |
|
| 472 | ]}, |
|
| 473 | {'match': { |
|
| 474 | 'in_port': evc.primary_links[0].endpoint_a.port_number, |
|
| 475 | 'dl_vlan': evc.primary_links[0].get_metadata('s_vlan').value |
|
| 476 | }, |
|
| 477 | 'actions': [ |
|
| 478 | {'action_type': 'pop_vlan'}, |
|
| 479 | {'action_type': 'output', 'port': uni_a.interface.port_number} |
|
| 480 | ] |
|
| 481 | } |
|
| 482 | ] |
|
| 483 | send_flow_mods_mock.assert_any_call(uni_a.interface.switch, |
|
| 484 | expected_flow_mod_a) |
|
| 485 | ||
| 486 | expected_flow_mod_z = [ |
|
| 487 | {'match': {'in_port': uni_z.interface.port_number, |
|
| 488 | 'dl_vlan': uni_z.user_tag.value}, |
|
| 489 | 'actions': [ |
|
| 490 | {'action_type': 'set_vlan', 'vlan_id': uni_a.user_tag.value}, |
|
| 491 | {'action_type': 'push_vlan', 'tag_type': 's'}, |
|
| 492 | {'action_type': 'set_vlan', |
|
| 493 | 'vlan_id': evc.primary_links[-1].get_metadata('s_vlan').value |
|
| 494 | }, |
|
| 495 | {'action_type': 'output', |
|
| 496 | 'port': evc.primary_links[-1].endpoint_b.port_number} |
|
| 497 | ] |
|
| 498 | }, |
|
| 499 | {'match': { |
|
| 500 | 'in_port': evc.primary_links[-1].endpoint_b.port_number, |
|
| 501 | 'dl_vlan': evc.primary_links[-1].get_metadata('s_vlan').value |
|
| 502 | }, |
|
| 503 | 'actions': [ |
|
| 504 | {'action_type': 'pop_vlan'}, |
|
| 505 | {'action_type': 'output', 'port': uni_z.interface.port_number} |
|
| 506 | ] |
|
| 507 | } |
|
| 508 | ] |
|
| 509 | ||
| 510 | send_flow_mods_mock.assert_any_call(uni_z.interface.switch, |
|
| 511 | expected_flow_mod_z) |
|
| 512 | ||
| 513 | @staticmethod |
|
| 514 | @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods') |
|
| @@ 165-240 (lines=76) @@ | ||
| 162 | } |
|
| 163 | self.assertEqual(expected_flow_mod, flow_mod) |
|
| 164 | ||
| 165 | @staticmethod |
|
| 166 | @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods') |
|
| 167 | def test_install_uni_flows(send_flow_mods_mock): |
|
| 168 | """Test install uni flows method. |
|
| 169 | ||
| 170 | This test will verify the flows send to the send_flows_mods method. |
|
| 171 | """ |
|
| 172 | uni_a = get_uni_mocked(interface_port=2, tag_value=82, |
|
| 173 | switch_id="switch_uni_a", is_valid=True) |
|
| 174 | uni_z = get_uni_mocked(interface_port=3, tag_value=83, |
|
| 175 | switch_id="switch_uni_z", is_valid=True) |
|
| 176 | ||
| 177 | attributes = { |
|
| 178 | "name": "custom_name", |
|
| 179 | "uni_a": uni_a, |
|
| 180 | "uni_z": uni_z, |
|
| 181 | "primary_links": [ |
|
| 182 | get_link_mocked(endpoint_a_port=9, endpoint_b_port=10, |
|
| 183 | metadata={"s_vlan": 5}), |
|
| 184 | get_link_mocked(endpoint_a_port=11, endpoint_b_port=12, |
|
| 185 | metadata={"s_vlan": 6}) |
|
| 186 | ] |
|
| 187 | } |
|
| 188 | evc = EVC(**attributes) |
|
| 189 | evc.install_uni_flows(attributes['primary_links']) |
|
| 190 | ||
| 191 | expected_flow_mod_a = [ |
|
| 192 | {'match': {'in_port': uni_a.interface.port_number, |
|
| 193 | 'dl_vlan': uni_a.user_tag.value}, |
|
| 194 | 'actions': [ |
|
| 195 | {'action_type': 'set_vlan', 'vlan_id': uni_z.user_tag.value}, |
|
| 196 | {'action_type': 'push_vlan', 'tag_type': 's'}, |
|
| 197 | {'action_type': 'set_vlan', |
|
| 198 | 'vlan_id': evc.primary_links[0].get_metadata('s_vlan').value}, |
|
| 199 | {'action_type': 'output', |
|
| 200 | 'port': evc.primary_links[0].endpoint_a.port_number} |
|
| 201 | ]}, |
|
| 202 | {'match': { |
|
| 203 | 'in_port': evc.primary_links[0].endpoint_a.port_number, |
|
| 204 | 'dl_vlan': evc.primary_links[0].get_metadata('s_vlan').value |
|
| 205 | }, |
|
| 206 | 'actions': [ |
|
| 207 | {'action_type': 'pop_vlan'}, |
|
| 208 | {'action_type': 'output', 'port': uni_a.interface.port_number} |
|
| 209 | ] |
|
| 210 | } |
|
| 211 | ] |
|
| 212 | send_flow_mods_mock.assert_any_call(uni_a.interface.switch, |
|
| 213 | expected_flow_mod_a) |
|
| 214 | ||
| 215 | expected_flow_mod_z = [ |
|
| 216 | {'match': {'in_port': uni_z.interface.port_number, |
|
| 217 | 'dl_vlan': uni_z.user_tag.value}, |
|
| 218 | 'actions': [ |
|
| 219 | {'action_type': 'set_vlan', 'vlan_id': uni_a.user_tag.value}, |
|
| 220 | {'action_type': 'push_vlan', 'tag_type': 's'}, |
|
| 221 | {'action_type': 'set_vlan', |
|
| 222 | 'vlan_id': evc.primary_links[-1].get_metadata('s_vlan').value |
|
| 223 | }, |
|
| 224 | {'action_type': 'output', |
|
| 225 | 'port': evc.primary_links[-1].endpoint_b.port_number} |
|
| 226 | ] |
|
| 227 | }, |
|
| 228 | {'match': { |
|
| 229 | 'in_port': evc.primary_links[-1].endpoint_b.port_number, |
|
| 230 | 'dl_vlan': evc.primary_links[-1].get_metadata('s_vlan').value |
|
| 231 | }, |
|
| 232 | 'actions': [ |
|
| 233 | {'action_type': 'pop_vlan'}, |
|
| 234 | {'action_type': 'output', 'port': uni_z.interface.port_number} |
|
| 235 | ] |
|
| 236 | } |
|
| 237 | ] |
|
| 238 | ||
| 239 | send_flow_mods_mock.assert_any_call(uni_z.interface.switch, |
|
| 240 | expected_flow_mod_z) |
|
| 241 | ||
| 242 | @staticmethod |
|
| 243 | @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods') |
|