Test Failed
Pull Request — master (#56)
by Carlos
01:37
created

build.main.Main._build_lldp_flow_mod()   A

Complexity

Conditions 2

Size

Total Lines 20
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 2
dl 0
loc 20
ccs 12
cts 12
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
"""NApp responsible to discover new switches and hosts."""
2 1
import struct
3
4 1
import requests
5 1
from flask import jsonify, request
6 1
from pyof.foundation.basic_types import DPID, UBInt16, UBInt32
7 1
from pyof.foundation.network_types import LLDP, VLAN, Ethernet, EtherType
8 1
from pyof.v0x01.common.action import ActionOutput as AO10
9 1
from pyof.v0x01.common.phy_port import Port as Port10
10 1
from pyof.v0x01.controller2switch.flow_mod import FlowMod as FM10
11 1
from pyof.v0x01.controller2switch.flow_mod import FlowModCommand as FMC
12 1
from pyof.v0x01.controller2switch.packet_out import PacketOut as PO10
13 1
from pyof.v0x04.common.action import ActionOutput as AO13
14 1
from pyof.v0x04.common.flow_instructions import InstructionApplyAction
15 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
16 1
from pyof.v0x04.common.port import PortNo as Port13
17 1
from pyof.v0x04.controller2switch.flow_mod import FlowMod as FM13
18
from pyof.v0x04.controller2switch.packet_out import PacketOut as PO13
19 1
20 1
from kytos.core import KytosEvent, KytosNApp, log, rest
21 1
from kytos.core.helpers import listen_to
22
from napps.kytos.of_lldp import constants, settings
23
24 1
25
class Main(KytosNApp):
26
    """Main OF_LLDP NApp Class."""
27 1
28
    def setup(self):
29 1
        """Make this NApp run in a loop."""
30 1
        self.vlan_id = None
31 1
        self.polling_time = settings.POLLING_TIME
32 1
        if hasattr(settings, "FLOW_VLAN_VID"):
33 1
            self.vlan_id = settings.FLOW_VLAN_VID
34
        self.execute_as_loop(self.polling_time)
35 1
36
    def execute(self):
37 1
        """Send LLDP Packets every 'POLLING_TIME' seconds to all switches."""
38 1
        switches = list(self.controller.switches.values())
39 1
        for switch in switches:
40 1
            try:
41
                of_version = switch.connection.protocol.version
42
            except AttributeError:
43
                of_version = None
44 1
45
            if not switch.is_connected():
46
                continue
47 1
48 1
            if of_version == 0x01:
49 1
                port_type = UBInt16
50 1
                local_port = Port10.OFPP_LOCAL
51 1
            elif of_version == 0x04:
52 1
                port_type = UBInt32
53
                local_port = Port13.OFPP_LOCAL
54
            else:
55
                # skip the current switch with unsupported OF version
56
                continue
57 1
58 1
            interfaces = list(switch.interfaces.values())
59
            for interface in interfaces:
60
                # Interface marked to receive lldp packet
61 1
                # Only send LLDP packet to active interface
62
                if(not interface.lldp or not interface.is_active()
63
                   or not interface.is_enabled()):
64
                    continue
65 1
                # Avoid the interface that connects to the controller.
66
                if interface.port_number == local_port:
67
                    continue
68 1
69 1
                lldp = LLDP()
70 1
                lldp.chassis_id.sub_value = DPID(switch.dpid)
71
                lldp.port_id.sub_value = port_type(interface.port_number)
72 1
73 1
                ethernet = Ethernet()
74 1
                ethernet.ether_type = EtherType.LLDP
75 1
                ethernet.source = interface.address
76 1
                ethernet.destination = constants.LLDP_MULTICAST_MAC
77
                ethernet.data = lldp.pack()
78 1
                # self.vlan_id == None will result in a packet with no VLAN.
79
                ethernet.vlans.append(VLAN(vid=self.vlan_id))
80 1
81
                packet_out = self._build_lldp_packet_out(
82
                                    of_version,
83
                                    interface.port_number, ethernet.pack())
84 1
85
                if packet_out is None:
86
                    continue
87 1
88
                event_out = KytosEvent(
89
                    name='kytos/of_lldp.messages.out.ofpt_packet_out',
90
                    content={
91
                            'destination': switch.connection,
92 1
                            'message': packet_out})
93 1
                self.controller.buffers.msg_out.put(event_out)
94
                log.debug(
95
                    "Sending a LLDP PacketOut to the switch %s",
96
                    switch.dpid)
97 1
98 1
                msg = '\n'
99 1
                msg += 'Switch: %s (%s)\n'
100 1
                msg += ' Interfaces: %s\n'
101 1
                msg += ' -- LLDP PacketOut --\n'
102 1
                msg += ' Ethernet: eth_type (%s) | src (%s) | dst (%s)'
103 1
                msg += '\n'
104
                msg += ' LLDP: Switch (%s) | port (%s)'
105 1
106
                log.debug(
107
                    msg,
108
                    switch.connection.address, switch.dpid,
109
                    switch.interfaces, ethernet.ether_type,
110
                    ethernet.source, ethernet.destination,
111
                    switch.dpid, interface.port_number)
112 1
113
    @listen_to('kytos/topology.switch.(enabled|disabled)')
114
    def install_lldp_flow(self, event):
115
        """Install a flow to send LLDP packets to the controller.
116
117
        The proactive flow is installed whenever a switch connects.
118
119
        Args:
120
            event (:class:`~kytos.core.events.KytosEvent`):
121
                Event with new switch information.
122
123 1
        """
124 1
        try:
125 1
            dpid = event.content['dpid']
126 1
            switch = self.controller.get_switch_by_dpid(dpid)
127
            of_version = switch.connection.protocol.version
128 1
129
            #of_version = event.content['switch'].connection.protocol.version
130 1
        except AttributeError:
131 1
            of_version = None
132 1
133
        flow = self._build_lldp_flow_mod(of_version)
134
        if flow:
135 1
            destination = switch.id
136 1
            endpoint = f'{settings.FLOW_MANAGER_URL}/flows/{destination}'
137
            data = {'flows': [flow]}
138 1
            # log.info(f'Install flow {data}')
139
            # log.info(f'Endpoint {endpoint}')
140
            response = requests.post(endpoint, json=data)
141
            log.info(f'Response {response.content}')
142
143
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_packet_in')
144
    def notify_uplink_detected(self, event):
145
        """Dispatch two KytosEvents to notify identified NNI interfaces.
146
147 1
        Args:
148 1
            event (:class:`~kytos.core.events.KytosEvent`):
149 1
                Event with an LLDP packet as data.
150 1
151 1
        """
152
        ethernet = self._unpack_non_empty(Ethernet, event.message.data)
153
        if ethernet.ether_type == EtherType.LLDP:
154
            try:
155
                lldp = self._unpack_non_empty(LLDP, ethernet.data)
156
                dpid = self._unpack_non_empty(DPID, lldp.chassis_id.sub_value)
157
            except struct.error:
158
                #: If we have a LLDP packet but we cannot unpack it, or the
159 1
                #: unpacked packet does not contain the dpid attribute, then
160 1
                #: we are dealing with a LLDP generated by someone else. Thus
161 1
                #: this packet is not useful for us and we may just ignore it.
162 1
                return
163
164
            switch_a = event.source.switch
165 1
            port_a = event.message.in_port
166 1
            switch_b = None
167
            port_b = None
168 1
169 1
            # in_port is currently a UBInt16 in v0x01 and an Int in v0x04.
170 1
            if isinstance(port_a, int):
171 1
                port_a = UBInt32(port_a)
172 1
173
            try:
174
                switch_b = self.controller.get_switch_by_dpid(dpid.value)
175
                of_version = switch_b.connection.protocol.version
176
                port_type = UBInt16 if of_version == 0x01 else UBInt32
177
                port_b = self._unpack_non_empty(port_type,
178 1
                                                lldp.port_id.sub_value)
179
            except AttributeError:
180
                log.debug("Couldn't find datapath %s.", dpid.value)
181 1
182 1
            # Return if any of the needed information are not available
183
            if not (switch_a and port_a and switch_b and port_b):
184 1
                return
185
186
            interface_a = switch_a.get_interface_by_port_no(port_a.value)
187 1
            interface_b = switch_b.get_interface_by_port_no(port_b.value)
188
189 1
            event_out = KytosEvent(name='kytos/of_lldp.interface.is.nni',
190
                                   content={'interface_a': interface_a,
191 1
                                            'interface_b': interface_b})
192
            self.controller.buffers.app.put(event_out)
193
194 1
    def notify_lldp_change(self, state, interface_ids):
195
        """Dispatch a KytosEvent to notify changes to the LLDP status."""
196 1
        content = {'attribute': 'LLDP',
197
                   'state': state,
198 1
                   'interface_ids': interface_ids}
199
        event_out = KytosEvent(name='kytos/of_lldp.network_status.updated',
200
                               content=content)
201
        self.controller.buffers.app.put(event_out)
202 1
203
    def shutdown(self):
204
        """End of the application."""
205
        log.debug('Shutting down...')
206
207
    @staticmethod
208
    def _build_lldp_packet_out(version, port_number, data):
209
        """Build a LLDP PacketOut message.
210
211
        Args:
212
            version (int): OpenFlow version
213
            port_number (int): Switch port number where the packet must be
214
                forwarded to.
215
            data (bytes): Binary data to be sent through the port.
216
217
        Returns:
218 1
            PacketOut message for the specific given OpenFlow version, if it
219 1
                is supported.
220 1
            None if the OpenFlow version is not supported.
221 1
222 1
        """
223 1
        if version == 0x01:
224
            action_output_class = AO10
225 1
            packet_out_class = PO10
226 1
        elif version == 0x04:
227
            action_output_class = AO13
228 1
            packet_out_class = PO13
229 1
        else:
230
            log.info('Openflow version %s is not yet supported.', version)
231 1
            return None
232 1
233 1
        output_action = action_output_class()
234
        output_action.port = port_number
235 1
236
        packet_out = packet_out_class()
237 1
        packet_out.data = data
238
        packet_out.actions.append(output_action)
239
240
        return packet_out
241
242
    def _build_lldp_flow_mod(self, version):
243
        """Build a FlodMod message to send LLDP to the controller.
244
245
        Args:
246
            version (int): OpenFlow version.
247
248
        Returns:
249 1
            FlowMod message for the specific given OpenFlow version, if it is
250 1
                supported.
251 1
            None if the OpenFlow version is not supported.
252 1
253 1
        """
254 1
        flow = {}
255 1
        match = {}
256 1
        flow['priority'] = settings.FLOW_PRIORITY
257
        match['dl_type'] = EtherType.LLDP
258 1
259 1
        if self.vlan_id:
260 1
            match['dl_vlan'] = self.vlan_id
261 1
        flow['match'] = match
262
263 1
    if version == 0x01:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable version does not seem to be defined.
Loading history...
264 1
            flow['actions'] = [{'action_type': 'output',
265 1
                                'port': Port10.OFPP_CONTROLLER}]
266 1
267
        elif version == 0x04:
268 1
            instruction['actions'].append({'action': 'output',
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable instruction does not seem to be defined.
Loading history...
269 1
                                           'port': Port13.OFPP_CONTROLLER})
270 1
            flow['instructions'] = []
271 1
            flow['instructions'].append(instruction)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable flow does not seem to be defined.
Loading history...
272 1
273 1
        else:
274
            flow = None
275 1
276 1
        return flow
277 1
278
    @staticmethod
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable staticmethod does not seem to be defined.
Loading history...
279
    def _unpack_non_empty(desired_class, data):
280 1
        """Unpack data using an instance of desired_class.
281
282 1
        Args:
283
            desired_class (class): The class to be used to unpack data.
284 1
            data (bytes): bytes to be unpacked.
285
286
        Return:
287
            An instance of desired_class class with data unpacked into it.
288
289
        Raises:
290
            UnpackException if the unpack could not be performed.
291
292
        """
293
        obj = desired_class()
294
295
        if hasattr(data, 'value'):
296
            data = data.value
297
298
        obj.unpack(data)
299 1
300
        return obj
301 1
302 1
    @staticmethod
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable staticmethod does not seem to be defined.
Loading history...
303
    def _get_data(req):
304 1
        """Get request data."""
305
        data = req.get_json()  # Valid format { "interfaces": [...] }
306 1
        return data.get('interfaces', [])
307
308 1
    def _get_interfaces(self):
309
        """Get all interfaces."""
310
        interfaces = []
311 1
        for switch in list(self.controller.switches.values()):
312 1
            interfaces += list(switch.interfaces.values())
313
        return interfaces
314 1
315
    @staticmethod
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable staticmethod does not seem to be defined.
Loading history...
316 1
    def _get_interfaces_dict(interfaces):
317 1
        """Return a dict of interfaces."""
318 1
        return {inter.id: inter for inter in interfaces}
319 1
320
    def _get_lldp_interfaces(self):
321 1
        """Get interfaces enabled to receive LLDP packets."""
322
        return [inter.id for inter in self._get_interfaces() if inter.lldp]
323
324 1
    @rest('v1/interfaces', methods=['GET'])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rest does not seem to be defined.
Loading history...
325
    def get_lldp_interfaces(self):
326 1
        """Return all the interfaces that have LLDP traffic enabled."""
327
        return jsonify({"interfaces": self._get_lldp_interfaces()}), 200
328 1
329 View Code Duplication
    @rest('v1/interfaces/disable', methods=['POST'])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rest does not seem to be defined.
Loading history...
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
330 1
    def disable_lldp(self):
331
        """Disables an interface to receive LLDP packets."""
332
        interface_ids = self._get_data(request)
333 1
        error_list = []  # List of interfaces that were not activated.
334
        changed_interfaces = []
335 1
        interface_ids = filter(None, interface_ids)
336
        interfaces = self._get_interfaces()
337
        if not interfaces:
338 1
            return jsonify("No interfaces were found."), 404
339 1
        interfaces = self._get_interfaces_dict(interfaces)
340 1
        for id_ in interface_ids:
341 1
            interface = interfaces.get(id_)
342 1
            if interface:
343 1
                interface.lldp = False
344 1
                changed_interfaces.append(id_)
345 1
            else:
346 1
                error_list.append(id_)
347 1
        if changed_interfaces:
348 1
            self.notify_lldp_change('disabled', changed_interfaces)
349 1
        if not error_list:
350 1
            return jsonify(
351
                "All the requested interfaces have been disabled."), 200
352 1
353 1
        # Return a list of interfaces that couldn't be disabled
354 1
        msg_error = "Some interfaces couldn't be found and deactivated: "
355 1
        return jsonify({msg_error:
356 1
                        error_list}), 400
357
358 View Code Duplication
    @rest('v1/interfaces/enable', methods=['POST'])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rest does not seem to be defined.
Loading history...
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
359
    def enable_lldp(self):
360 1
        """Enable an interface to receive LLDP packets."""
361 1
        interface_ids = self._get_data(request)
362
        error_list = []  # List of interfaces that were not activated.
363
        changed_interfaces = []
364 1
        interface_ids = filter(None, interface_ids)
365
        interfaces = self._get_interfaces()
366
        if not interfaces:
367 1
            return jsonify("No interfaces were found."), 404
368 1
        interfaces = self._get_interfaces_dict(interfaces)
369 1
        for id_ in interface_ids:
370 1
            interface = interfaces.get(id_)
371 1
            if interface:
372 1
                interface.lldp = True
373 1
                changed_interfaces.append(id_)
374 1
            else:
375 1
                error_list.append(id_)
376 1
        if changed_interfaces:
377 1
            self.notify_lldp_change('enabled', changed_interfaces)
378 1
        if not error_list:
379 1
            return jsonify(
380
                "All the requested interfaces have been enabled."), 200
381 1
382 1
        # Return a list of interfaces that couldn't be enabled
383 1
        msg_error = "Some interfaces couldn't be found and activated: "
384 1
        return jsonify({msg_error:
385 1
                        error_list}), 400
386
387
    @rest('v1/polling_time', methods=['GET'])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rest does not seem to be defined.
Loading history...
388
    def get_time(self):
389 1
        """Get LLDP polling time in seconds."""
390 1
        return jsonify({"polling_time": self.polling_time}), 200
391
392
    @rest('v1/polling_time', methods=['POST'])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable rest does not seem to be defined.
Loading history...
393 1
    def set_time(self):
394
        """Set LLDP polling time."""
395
        # pylint: disable=attribute-defined-outside-init
396 1
        try:
397
            payload = request.get_json()
398 1
            polling_time = int(payload['polling_time'])
399
            if polling_time <= 0:
400
                raise ValueError(f"invalid polling_time {polling_time}, "
401
                                 "must be greater than zero")
402 1
            self.polling_time = polling_time
403 1
            self.execute_as_loop(self.polling_time)
404 1
            log.info("Polling time has been updated to %s"
405 1
                     " second(s), but this change will not be saved"
406
                     " permanently.", self.polling_time)
407
            return jsonify("Polling time has been updated."), 200
408 1
        except (ValueError, KeyError) as error:
409 1
            msg = f"This operation is not completed: {error}"
410
            return jsonify(msg), 400
411