1
|
|
|
"""Module to test flow methods.""" |
2
|
|
|
from unittest import TestCase |
3
|
|
|
from unittest.mock import MagicMock, patch, PropertyMock |
4
|
|
|
|
5
|
|
|
from kytos.lib.helpers import (get_switch_mock, get_kytos_event_mock, |
6
|
|
|
get_connection_mock) |
7
|
|
|
|
8
|
|
|
from tests.helpers import get_controller_mock |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
# pylint: disable=protected-access, too-many-public-methods |
12
|
|
|
class TestFlowFactory(TestCase): |
13
|
|
|
"""docstring for TestFlow.""" |
14
|
|
|
|
15
|
|
|
def setUp(self): |
16
|
|
|
"""Execute steps before each tests. |
17
|
|
|
Set the server_name_url from kytos/of_core |
18
|
|
|
""" |
19
|
|
|
self.switch_v0x01 = get_switch_mock("00:00:00:00:00:00:00:01") |
20
|
|
|
self.switch_v0x04 = get_switch_mock("00:00:00:00:00:00:00:02") |
21
|
|
|
self.switch_v0x01.connection = get_connection_mock( |
22
|
|
|
0x01, get_switch_mock("00:00:00:00:00:00:00:03")) |
23
|
|
|
self.switch_v0x04.connection = get_connection_mock( |
24
|
|
|
0x04, get_switch_mock("00:00:00:00:00:00:00:04")) |
25
|
|
|
|
26
|
|
|
patch('kytos.core.helpers.run_on_thread', lambda x: x).start() |
27
|
|
|
# pylint: disable=bad-option-value |
28
|
|
|
from napps.kytos.of_core.flow import FlowFactory |
29
|
|
|
self.addCleanup(patch.stopall) |
30
|
|
|
|
31
|
|
|
self.napp = FlowFactory() |
32
|
|
|
|
33
|
|
|
@patch('napps.kytos.of_core.flow.v0x01') |
34
|
|
|
@patch('napps.kytos.of_core.flow.v0x04') |
35
|
|
|
def test_from_of_flow_stats(self, *args): |
36
|
|
|
"""Test from_of_flow_stats.""" |
37
|
|
|
(mock_flow_v0x04, mock_flow_v0x01) = args |
38
|
|
|
mock_stats = MagicMock() |
39
|
|
|
|
40
|
|
|
self.napp.from_of_flow_stats(mock_stats, self.switch_v0x01) |
41
|
|
|
mock_flow_v0x01.flow.Flow.from_of_flow_stats.assert_called() |
42
|
|
|
|
43
|
|
|
self.napp.from_of_flow_stats(mock_stats, self.switch_v0x04) |
44
|
|
|
mock_flow_v0x04.flow.Flow.from_of_flow_stats.assert_called() |
45
|
|
|
|