Code Duplication    Length = 50-53 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 72-124 (lines=53) @@
69
    assert controller.buffers.app.aput.call_count == 1
70
71
72
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
73
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
74
@patch('napps.kytos.of_lldp.main.UBInt32')
75
@patch('napps.kytos.of_lldp.main.DPID')
76
@patch('napps.kytos.of_lldp.main.LLDP')
77
@patch('napps.kytos.of_lldp.main.Ethernet')
78
async def test_on_ofpt_packet_in_early_intf(*args):
79
    """Test on_ofpt_packet_in early intf return."""
80
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
81
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
82
83
    # pylint: disable=bad-option-value, import-outside-toplevel
84
    from napps.kytos.of_lldp.main import Main
85
    Main.get_liveness_controller = MagicMock()
86
    topology = get_topology_mock()
87
    controller = get_controller_mock()
88
    controller.buffers.app.aput = AsyncMock()
89
    controller.switches = topology.switches
90
    napp = Main(controller)
91
    napp.loop_manager.process_if_looped = AsyncMock()
92
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
93
94
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
95
    message = MagicMock(in_port=1, data='data')
96
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
97
                       'message': message})
98
99
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
100
    mocked.value = 1
101
    mock_ubint32.return_value = mocked
102
    ethernet.ether_type = 0x88CC
103
    ethernet.data = 'eth_data'
104
    lldp.chassis_id.sub_value = 'chassis_id'
105
    lldp.port_id.sub_value = 'port_id'
106
    dpid.value = "00:00:00:00:00:00:00:02"
107
    port_b.value = 2
108
109
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
110
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
111
                                                           0x04)
112
    switch.get_interface_by_port_no = MagicMock(return_value=None)
113
    await napp.on_ofpt_packet_in(event)
114
115
    calls = [call(mock_ethernet, message.data),
116
             call(mock_lldp, ethernet.data),
117
             call(mock_dpid, lldp.chassis_id.sub_value),
118
             call(mock_ubint32, lldp.port_id.sub_value)]
119
    mock_unpack_non_empty.assert_has_calls(calls)
120
    switch.get_interface_by_port_no.assert_called()
121
    # early return shouldn't allow these to get called
122
    assert napp.loop_manager.process_if_looped.call_count == 0
123
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 0
124
    assert controller.buffers.app.aput.call_count == 0
125
126
127
async def test_on_table_enabled():
@@ 20-69 (lines=50) @@
17
# pylint: import-outside-toplevel
18
19
20
@patch('kytos.core.controller.Controller.get_switch_by_dpid')
21
@patch('napps.kytos.of_lldp.main.Main._unpack_non_empty')
22
@patch('napps.kytos.of_lldp.main.UBInt32')
23
@patch('napps.kytos.of_lldp.main.DPID')
24
@patch('napps.kytos.of_lldp.main.LLDP')
25
@patch('napps.kytos.of_lldp.main.Ethernet')
26
async def test_on_ofpt_packet_in(*args):
27
    """Test on_ofpt_packet_in."""
28
    (mock_ethernet, mock_lldp, mock_dpid, mock_ubint32,
29
     mock_unpack_non_empty, mock_get_switch_by_dpid) = args
30
31
    # pylint: disable=bad-option-value, import-outside-toplevel
32
    from napps.kytos.of_lldp.main import Main
33
    Main.get_liveness_controller = MagicMock()
34
    topology = get_topology_mock()
35
    controller = get_controller_mock()
36
    controller.buffers.app.aput = AsyncMock()
37
    controller.switches = topology.switches
38
    napp = Main(controller)
39
    napp.loop_manager.process_if_looped = AsyncMock()
40
    napp.liveness_manager.consume_hello_if_enabled = AsyncMock()
41
42
    switch = get_switch_mock("00:00:00:00:00:00:00:01", 0x04)
43
    message = MagicMock(in_port=1, data='data')
44
    event = KytosEvent('ofpt_packet_in', content={'source': switch.connection,
45
                       'message': message})
46
47
    mocked, ethernet, lldp, dpid, port_b = [MagicMock() for _ in range(5)]
48
    mocked.value = 1
49
    mock_ubint32.return_value = mocked
50
    ethernet.ether_type = 0x88CC
51
    ethernet.data = 'eth_data'
52
    lldp.chassis_id.sub_value = 'chassis_id'
53
    lldp.port_id.sub_value = 'port_id'
54
    dpid.value = "00:00:00:00:00:00:00:02"
55
    port_b.value = 2
56
57
    mock_unpack_non_empty.side_effect = [ethernet, lldp, dpid, port_b]
58
    mock_get_switch_by_dpid.return_value = get_switch_mock(dpid.value,
59
                                                           0x04)
60
    await napp.on_ofpt_packet_in(event)
61
62
    calls = [call(mock_ethernet, message.data),
63
             call(mock_lldp, ethernet.data),
64
             call(mock_dpid, lldp.chassis_id.sub_value),
65
             call(mock_ubint32, lldp.port_id.sub_value)]
66
    mock_unpack_non_empty.assert_has_calls(calls)
67
    assert napp.loop_manager.process_if_looped.call_count == 1
68
    assert napp.liveness_manager.consume_hello_if_enabled.call_count == 1
69
    assert controller.buffers.app.aput.call_count == 1
70
71
72
@patch('kytos.core.controller.Controller.get_switch_by_dpid')