Code Duplication    Length = 50-53 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 64-116 (lines=53) @@
61
    assert controller.buffers.app.aput.call_count == 1
62
63
64
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
65
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
66
@patch('napps.kytos.of_lldp.main.UBInt32')
67
@patch('napps.kytos.of_lldp.main.DPID')
68
@patch('napps.kytos.of_lldp.main.LLDP')
69
@patch('napps.kytos.of_lldp.main.Ethernet')
70
async def test_on_ofpt_packet_in_early_intf(*args):
71
    """Test on_ofpt_packet_in early intf return."""
72
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
73
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
74
75
    # pylint: disable=bad-option-value, import-outside-toplevel
76
    from napps.kytos.of_lldp.main import Main
77
    Main.get_liveness_controller = MagicMock()
78
    topology = get_topology_mock()
79
    controller = get_controller_mock()
80
    controller.buffers.app.aput = AsyncMock()
81
    controller.switches = topology.switches
82
    napp = Main(controller)
83
    napp.loop_manager.process_if_looped = AsyncMock()
84
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
85
86
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
87
    message = MagicMock(in_port=1, data='data')
88
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
89
                       'message': message})
90
91
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
92
    mocked.value = 1
93
    mock_ubint32.return_value = mocked
94
    ethernet.ether_type = 0x88CC
95
    ethernet.data = 'eth_data'
96
    lldp.chassis_id.sub_value = 'chassis_id'
97
    lldp.port_id.sub_value = 'port_id'
98
    dpid.value = "00:00:00:00:00:00:00:02"
99
    port_b.value = 2
100
101
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
102
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
103
                                                           0x04)
104
    switch.get_interface_by_port_no = MagicMock(return_value=None)
105
    await napp.on_ofpt_packet_in(event)
106
107
    calls = [call(mock_ethernet, message.data),
108
             call(mock_lldp, ethernet.data),
109
             call(mock_dpid, lldp.chassis_id.sub_value),
110
             call(mock_ubint32, lldp.port_id.sub_value)]
111
    mock_unpack_non_empty.assert_has_calls(calls)
112
    switch.get_interface_by_port_no.assert_called()
113
    # early return shouldn't allow these to get called
114
    assert napp.loop_manager.process_if_looped.call_count == 0
115
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 0
116
    assert controller.buffers.app.aput.call_count == 0
117
118
119
async def test_on_table_enabled():
@@ 12-61 (lines=50) @@
9
from tests.helpers import get_topology_mock
10
11
12
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
13
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
14
@patch('napps.kytos.of_lldp.main.UBInt32')
15
@patch('napps.kytos.of_lldp.main.DPID')
16
@patch('napps.kytos.of_lldp.main.LLDP')
17
@patch('napps.kytos.of_lldp.main.Ethernet')
18
async def test_on_ofpt_packet_in(*args):
19
    """Test on_ofpt_packet_in."""
20
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
21
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
22
23
    # pylint: disable=bad-option-value, import-outside-toplevel
24
    from napps.kytos.of_lldp.main import Main
25
    Main.get_liveness_controller = MagicMock()
26
    topology = get_topology_mock()
27
    controller = get_controller_mock()
28
    controller.buffers.app.aput = AsyncMock()
29
    controller.switches = topology.switches
30
    napp = Main(controller)
31
    napp.loop_manager.process_if_looped = AsyncMock()
32
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
33
34
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
35
    message = MagicMock(in_port=1, data='data')
36
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
37
                       'message': message})
38
39
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
40
    mocked.value = 1
41
    mock_ubint32.return_value = mocked
42
    ethernet.ether_type = 0x88CC
43
    ethernet.data = 'eth_data'
44
    lldp.chassis_id.sub_value = 'chassis_id'
45
    lldp.port_id.sub_value = 'port_id'
46
    dpid.value = "00:00:00:00:00:00:00:02"
47
    port_b.value = 2
48
49
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
50
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
51
                                                           0x04)
52
    await napp.on_ofpt_packet_in(event)
53
54
    calls = [call(mock_ethernet, message.data),
55
             call(mock_lldp, ethernet.data),
56
             call(mock_dpid, lldp.chassis_id.sub_value),
57
             call(mock_ubint32, lldp.port_id.sub_value)]
58
    mock_unpack_non_empty.assert_has_calls(calls)
59
    assert napp.loop_manager.process_if_looped.call_count == 1
60
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 1
61
    assert controller.buffers.app.aput.call_count == 1
62
63
64
@patch('kytos.core.controller.Controller.get_switch_by_dpid')