Code Duplication    Length = 50-53 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 68-120 (lines=53) @@
65
    assert controller.buffers.app.aput.call_count == 1
66
67
68
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
69
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
70
@patch('napps.kytos.of_lldp.main.UBInt32')
71
@patch('napps.kytos.of_lldp.main.DPID')
72
@patch('napps.kytos.of_lldp.main.LLDP')
73
@patch('napps.kytos.of_lldp.main.Ethernet')
74
async def test_on_ofpt_packet_in_early_intf(*args):
75
    """Test on_ofpt_packet_in early intf return."""
76
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
77
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
78
79
    # pylint: disable=bad-option-value, import-outside-toplevel
80
    from napps.kytos.of_lldp.main import Main
81
    Main.get_liveness_controller = MagicMock()
82
    topology = get_topology_mock()
83
    controller = get_controller_mock()
84
    controller.buffers.app.aput = AsyncMock()
85
    controller.switches = topology.switches
86
    napp = Main(controller)
87
    napp.loop_manager.process_if_looped = AsyncMock()
88
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
89
90
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
91
    message = MagicMock(in_port=1, data='data')
92
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
93
                       'message': message})
94
95
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
96
    mocked.value = 1
97
    mock_ubint32.return_value = mocked
98
    ethernet.ether_type = 0x88CC
99
    ethernet.data = 'eth_data'
100
    lldp.chassis_id.sub_value = 'chassis_id'
101
    lldp.port_id.sub_value = 'port_id'
102
    dpid.value = "00:00:00:00:00:00:00:02"
103
    port_b.value = 2
104
105
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
106
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
107
                                                           0x04)
108
    switch.get_interface_by_port_no = MagicMock(return_value=None)
109
    await napp.on_ofpt_packet_in(event)
110
111
    calls = [call(mock_ethernet, message.data),
112
             call(mock_lldp, ethernet.data),
113
             call(mock_dpid, lldp.chassis_id.sub_value),
114
             call(mock_ubint32, lldp.port_id.sub_value)]
115
    mock_unpack_non_empty.assert_has_calls(calls)
116
    switch.get_interface_by_port_no.assert_called()
117
    # early return shouldn't allow these to get called
118
    assert napp.loop_manager.process_if_looped.call_count == 0
119
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 0
120
    assert controller.buffers.app.aput.call_count == 0
121
122
123
async def test_on_table_enabled():
@@ 16-65 (lines=50) @@
13
from tests.helpers import get_topology_mock
14
15
16
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
17
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
18
@patch('napps.kytos.of_lldp.main.UBInt32')
19
@patch('napps.kytos.of_lldp.main.DPID')
20
@patch('napps.kytos.of_lldp.main.LLDP')
21
@patch('napps.kytos.of_lldp.main.Ethernet')
22
async def test_on_ofpt_packet_in(*args):
23
    """Test on_ofpt_packet_in."""
24
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
25
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
26
27
    # pylint: disable=bad-option-value, import-outside-toplevel
28
    from napps.kytos.of_lldp.main import Main
29
    Main.get_liveness_controller = MagicMock()
30
    topology = get_topology_mock()
31
    controller = get_controller_mock()
32
    controller.buffers.app.aput = AsyncMock()
33
    controller.switches = topology.switches
34
    napp = Main(controller)
35
    napp.loop_manager.process_if_looped = AsyncMock()
36
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
37
38
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
39
    message = MagicMock(in_port=1, data='data')
40
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
41
                       'message': message})
42
43
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
44
    mocked.value = 1
45
    mock_ubint32.return_value = mocked
46
    ethernet.ether_type = 0x88CC
47
    ethernet.data = 'eth_data'
48
    lldp.chassis_id.sub_value = 'chassis_id'
49
    lldp.port_id.sub_value = 'port_id'
50
    dpid.value = "00:00:00:00:00:00:00:02"
51
    port_b.value = 2
52
53
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
54
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
55
                                                           0x04)
56
    await napp.on_ofpt_packet_in(event)
57
58
    calls = [call(mock_ethernet, message.data),
59
             call(mock_lldp, ethernet.data),
60
             call(mock_dpid, lldp.chassis_id.sub_value),
61
             call(mock_ubint32, lldp.port_id.sub_value)]
62
    mock_unpack_non_empty.assert_has_calls(calls)
63
    assert napp.loop_manager.process_if_looped.call_count == 1
64
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 1
65
    assert controller.buffers.app.aput.call_count == 1
66
67
68
@patch('kytos.core.controller.Controller.get_switch_by_dpid')