Code Duplication    Length = 50-53 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 69-121 (lines=53) @@
66
    assert controller.buffers.app.aput.call_count == 1
67
68
69
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
70
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
71
@patch('napps.kytos.of_lldp.main.UBInt32')
72
@patch('napps.kytos.of_lldp.main.DPID')
73
@patch('napps.kytos.of_lldp.main.LLDP')
74
@patch('napps.kytos.of_lldp.main.Ethernet')
75
async def test_on_ofpt_packet_in_early_intf(*args):
76
    """Test on_ofpt_packet_in early intf return."""
77
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
78
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
79
80
    # pylint: disable=bad-option-value, import-outside-toplevel
81
    from napps.kytos.of_lldp.main import Main
82
    Main.get_liveness_controller = MagicMock()
83
    topology = get_topology_mock()
84
    controller = get_controller_mock()
85
    controller.buffers.app.aput = AsyncMock()
86
    controller.switches = topology.switches
87
    napp = Main(controller)
88
    napp.loop_manager.process_if_looped = AsyncMock()
89
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
90
91
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
92
    message = MagicMock(in_port=1, data='data')
93
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
94
                       'message': message})
95
96
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
97
    mocked.value = 1
98
    mock_ubint32.return_value = mocked
99
    ethernet.ether_type = 0x88CC
100
    ethernet.data = 'eth_data'
101
    lldp.chassis_id.sub_value = 'chassis_id'
102
    lldp.port_id.sub_value = 'port_id'
103
    dpid.value = "00:00:00:00:00:00:00:02"
104
    port_b.value = 2
105
106
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
107
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
108
                                                           0x04)
109
    switch.get_interface_by_port_no = MagicMock(return_value=None)
110
    await napp.on_ofpt_packet_in(event)
111
112
    calls = [call(mock_ethernet, message.data),
113
             call(mock_lldp, ethernet.data),
114
             call(mock_dpid, lldp.chassis_id.sub_value),
115
             call(mock_ubint32, lldp.port_id.sub_value)]
116
    mock_unpack_non_empty.assert_has_calls(calls)
117
    switch.get_interface_by_port_no.assert_called()
118
    # early return shouldn't allow these to get called
119
    assert napp.loop_manager.process_if_looped.call_count == 0
120
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 0
121
    assert controller.buffers.app.aput.call_count == 0
122
123
124
async def test_on_table_enabled():
@@ 17-66 (lines=50) @@
14
from tests.helpers import get_topology_mock
15
16
17
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
18
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
19
@patch('napps.kytos.of_lldp.main.UBInt32')
20
@patch('napps.kytos.of_lldp.main.DPID')
21
@patch('napps.kytos.of_lldp.main.LLDP')
22
@patch('napps.kytos.of_lldp.main.Ethernet')
23
async def test_on_ofpt_packet_in(*args):
24
    """Test on_ofpt_packet_in."""
25
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
26
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
27
28
    # pylint: disable=bad-option-value, import-outside-toplevel
29
    from napps.kytos.of_lldp.main import Main
30
    Main.get_liveness_controller = MagicMock()
31
    topology = get_topology_mock()
32
    controller = get_controller_mock()
33
    controller.buffers.app.aput = AsyncMock()
34
    controller.switches = topology.switches
35
    napp = Main(controller)
36
    napp.loop_manager.process_if_looped = AsyncMock()
37
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
38
39
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
40
    message = MagicMock(in_port=1, data='data')
41
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
42
                       'message': message})
43
44
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
45
    mocked.value = 1
46
    mock_ubint32.return_value = mocked
47
    ethernet.ether_type = 0x88CC
48
    ethernet.data = 'eth_data'
49
    lldp.chassis_id.sub_value = 'chassis_id'
50
    lldp.port_id.sub_value = 'port_id'
51
    dpid.value = "00:00:00:00:00:00:00:02"
52
    port_b.value = 2
53
54
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
55
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
56
                                                           0x04)
57
    await napp.on_ofpt_packet_in(event)
58
59
    calls = [call(mock_ethernet, message.data),
60
             call(mock_lldp, ethernet.data),
61
             call(mock_dpid, lldp.chassis_id.sub_value),
62
             call(mock_ubint32, lldp.port_id.sub_value)]
63
    mock_unpack_non_empty.assert_has_calls(calls)
64
    assert napp.loop_manager.process_if_looped.call_count == 1
65
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 1
66
    assert controller.buffers.app.aput.call_count == 1
67
68
69
@patch('kytos.core.controller.Controller.get_switch_by_dpid')