Code Duplication    Length = 50-53 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 70-122 (lines=53) @@
67
    assert controller.buffers.app.aput.call_count == 1
68
69
70
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
71
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
72
@patch('napps.kytos.of_lldp.main.UBInt32')
73
@patch('napps.kytos.of_lldp.main.DPID')
74
@patch('napps.kytos.of_lldp.main.LLDP')
75
@patch('napps.kytos.of_lldp.main.Ethernet')
76
async def test_on_ofpt_packet_in_early_intf(*args):
77
    """Test on_ofpt_packet_in early intf return."""
78
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
79
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
80
81
    # pylint: disable=bad-option-value, import-outside-toplevel
82
    from napps.kytos.of_lldp.main import Main
83
    Main.get_liveness_controller = MagicMock()
84
    topology = get_topology_mock()
85
    controller = get_controller_mock()
86
    controller.buffers.app.aput = AsyncMock()
87
    controller.switches = topology.switches
88
    napp = Main(controller)
89
    napp.loop_manager.process_if_looped = AsyncMock()
90
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
91
92
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
93
    message = MagicMock(in_port=1, data='data')
94
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
95
                       'message': message})
96
97
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
98
    mocked.value = 1
99
    mock_ubint32.return_value = mocked
100
    ethernet.ether_type = 0x88CC
101
    ethernet.data = 'eth_data'
102
    lldp.chassis_id.sub_value = 'chassis_id'
103
    lldp.port_id.sub_value = 'port_id'
104
    dpid.value = "00:00:00:00:00:00:00:02"
105
    port_b.value = 2
106
107
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
108
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
109
                                                           0x04)
110
    switch.get_interface_by_port_no = MagicMock(return_value=None)
111
    await napp.on_ofpt_packet_in(event)
112
113
    calls = [call(mock_ethernet, message.data),
114
             call(mock_lldp, ethernet.data),
115
             call(mock_dpid, lldp.chassis_id.sub_value),
116
             call(mock_ubint32, lldp.port_id.sub_value)]
117
    mock_unpack_non_empty.assert_has_calls(calls)
118
    switch.get_interface_by_port_no.assert_called()
119
    # early return shouldn't allow these to get called
120
    assert napp.loop_manager.process_if_looped.call_count == 0
121
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 0
122
    assert controller.buffers.app.aput.call_count == 0
123
124
125
async def test_on_table_enabled():
@@ 18-67 (lines=50) @@
15
from tests.helpers import get_topology_mock
16
17
18
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
19
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
20
@patch('napps.kytos.of_lldp.main.UBInt32')
21
@patch('napps.kytos.of_lldp.main.DPID')
22
@patch('napps.kytos.of_lldp.main.LLDP')
23
@patch('napps.kytos.of_lldp.main.Ethernet')
24
async def test_on_ofpt_packet_in(*args):
25
    """Test on_ofpt_packet_in."""
26
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
27
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
28
29
    # pylint: disable=bad-option-value, import-outside-toplevel
30
    from napps.kytos.of_lldp.main import Main
31
    Main.get_liveness_controller = MagicMock()
32
    topology = get_topology_mock()
33
    controller = get_controller_mock()
34
    controller.buffers.app.aput = AsyncMock()
35
    controller.switches = topology.switches
36
    napp = Main(controller)
37
    napp.loop_manager.process_if_looped = AsyncMock()
38
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
39
40
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
41
    message = MagicMock(in_port=1, data='data')
42
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
43
                       'message': message})
44
45
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
46
    mocked.value = 1
47
    mock_ubint32.return_value = mocked
48
    ethernet.ether_type = 0x88CC
49
    ethernet.data = 'eth_data'
50
    lldp.chassis_id.sub_value = 'chassis_id'
51
    lldp.port_id.sub_value = 'port_id'
52
    dpid.value = "00:00:00:00:00:00:00:02"
53
    port_b.value = 2
54
55
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
56
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
57
                                                           0x04)
58
    await napp.on_ofpt_packet_in(event)
59
60
    calls = [call(mock_ethernet, message.data),
61
             call(mock_lldp, ethernet.data),
62
             call(mock_dpid, lldp.chassis_id.sub_value),
63
             call(mock_ubint32, lldp.port_id.sub_value)]
64
    mock_unpack_non_empty.assert_has_calls(calls)
65
    assert napp.loop_manager.process_if_looped.call_count == 1
66
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 1
67
    assert controller.buffers.app.aput.call_count == 1
68
69
70
@patch('kytos.core.controller.Controller.get_switch_by_dpid')