Passed
Pull Request — master (#96)
by Gleyberson
02:11
created

build.main   F

Complexity

Total Complexity 82

Size/Duplication

Total Lines 543
Duplicated Lines 0 %

Test Coverage

Coverage 89.57%

Importance

Changes 0
Metric Value
wmc 82
eloc 341
dl 0
loc 543
ccs 249
cts 278
cp 0.8957
rs 2
c 0
b 0
f 0

24 Methods

Rating   Name   Duplication   Size   Complexity  
D Main.handle_raw_in() 0 77 13
A Main.fail_negotiation() 0 20 1
A Main._request_flow_list() 0 8 3
A Main._handle_multipart_flow_stats() 0 11 3
A Main.handle_features_reply() 0 26 4
A Main.handle_multipart_reply() 0 18 4
B Main.handle_error_message() 0 29 8
A Main.emit_message_in() 0 13 4
A Main.handle_features_request_sent() 0 5 2
A Main.shutdown() 0 3 1
A Main.handle_echo_request() 0 18 1
A Main.setup() 0 9 1
A Main._is_multipart_reply_ours() 0 7 3
A Main.execute() 0 14 4
A Main._update_switch_flows() 0 5 1
A Main._negotiate() 0 31 3
A Main.handle_openflow_in_hello_failed() 0 7 1
B Main.update_port_status() 0 71 5
A Main.handle_queued_openflow_echo_reply() 0 9 2
A Main.send_features_request() 0 7 1
A Main.update_links() 0 37 3
A Main.emit_message_out() 0 4 2
A Main._send_specific_port_mod() 0 21 5
A Main.handle_stats_reply() 0 16 3

2 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_version_from_header() 0 4 2
A _get_version_from_bitmask() 0 7 2

How to fix   Complexity   

Complexity

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""NApp responsible for the main OpenFlow basic operations."""
2
3 1
from pyof.foundation.exceptions import UnpackException
4 1
from pyof.foundation.network_types import Ethernet, EtherType
5 1
from pyof.utils import PYOF_VERSION_LIBS, unpack
6 1
from pyof.v0x01.asynchronous.error_msg import BadActionCode as BadActionCode01
7 1
from pyof.v0x01.common.header import Type
8 1
from pyof.v0x01.common.phy_port import PortConfig as PortConfig01
9 1
from pyof.v0x01.controller2switch.common import StatsType
10 1
from pyof.v0x04.asynchronous.error_msg import BadActionCode as BadActionCode04
11 1
from pyof.v0x04.common.port import PortConfig as PortConfig04
12 1
from pyof.v0x04.controller2switch.common import MultipartType
13
14 1
from kytos.core import KytosEvent, KytosNApp, log
15 1
from kytos.core.connection import ConnectionState
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import Interface
18 1
from napps.kytos.of_core import settings
19 1
from napps.kytos.of_core.utils import (GenericHello, NegotiationException,
20
                                       emit_message_in, emit_message_out,
21
                                       of_slicer)
22 1
from napps.kytos.of_core.v0x01 import utils as of_core_v0x01_utils
23 1
from napps.kytos.of_core.v0x01.flow import Flow as Flow01
24 1
from napps.kytos.of_core.v0x04 import utils as of_core_v0x04_utils
25 1
from napps.kytos.of_core.v0x04.flow import Flow as Flow04
26
27
28 1
class Main(KytosNApp):
29
    """Main class of the NApp responsible for OpenFlow basic operations."""
30
31
    # Keep track of multiple multipart replies from our own request only.
32
    # Assume that all replies are received before setting a new xid.
33 1
    _multipart_replies_xids = {}
34 1
    _multipart_replies_flows = {}
35
36 1
    def setup(self):
37
        """App initialization (used instead of ``__init__``).
38
39
        The setup method is automatically called by the run method.
40
        Users shouldn't call this method directly.
41
        """
42 1
        self.of_core_version_utils = {0x01: of_core_v0x01_utils,
43
                                      0x04: of_core_v0x04_utils}
44 1
        self.execute_as_loop(settings.STATS_INTERVAL)
45
46 1
    def execute(self):
47
        """Run once on app 'start' or in a loop.
48
49
        The execute method is called by the run method of KytosNApp class.
50
        Users shouldn't call this method directly.
51
        """
52 1
        for switch in self.controller.switches.values():
53 1
            if switch.is_connected():
54 1
                self._request_flow_list(switch)
55 1
                if settings.SEND_ECHO_REQUESTS:
56 1
                    version_utils = \
57
                        self.of_core_version_utils[switch.
58
                                                   connection.protocol.version]
59 1
                    version_utils.send_echo(self.controller, switch)
60
61 1
    def _request_flow_list(self, switch):
62
        """Send flow stats request to a connected switch."""
63 1
        of_version = switch.connection.protocol.version
64 1
        if of_version == 0x01:
65 1
            of_core_v0x01_utils.update_flow_list(self.controller, switch)
66 1
        elif of_version == 0x04:
67 1
            xid = of_core_v0x04_utils.update_flow_list(self.controller, switch)
68 1
            self._multipart_replies_xids[switch.id] = xid
69
70 1
    @staticmethod
71 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
72
    def handle_stats_reply(event):
73
        """Handle stats replies for v0x01 switches.
74
75
        Args:
76
            event (:class:`~kytos.core.events.KytosEvent):
77
                Event with ofpt_stats_reply in message.
78
        """
79 1
        switch = event.source.switch
80 1
        msg = event.content['message']
81 1
        if msg.body_type == StatsType.OFPST_FLOW:
82 1
            switch.flows = [Flow01.from_of_flow_stats(f, switch)
83
                            for f in msg.body]
84 1
        elif msg.body_type == StatsType.OFPST_DESC:
85 1
            switch.update_description(msg.body)
86
87 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_features_reply')
88
    def handle_features_reply(self, event):
89
        """Handle kytos/of_core.messages.in.ofpt_features_reply event.
90
91
        This is the end of the Handshake workflow of the OpenFlow Protocol.
92
93
        Args:
94
            event (KytosEvent): Event with features reply message.
95
        """
96 1
        connection = event.source
97 1
        version_utils = self.of_core_version_utils[connection.protocol.version]
98 1
        switch = version_utils.handle_features_reply(self.controller, event)
99
100 1
        if (connection.is_during_setup() and
101
                connection.protocol.state == 'waiting_features_reply'):
102 1
            connection.protocol.state = 'handshake_complete'
103 1
            connection.set_established_state()
104 1
            version_utils.send_desc_request(self.controller, switch)
105 1
            if settings.SEND_SET_CONFIG:
106 1
                version_utils.send_set_config(self.controller, switch)
107 1
            log.info('Connection %s, Switch %s: OPENFLOW HANDSHAKE COMPLETE',
108
                     connection.id, switch.dpid)
109 1
            event_raw = KytosEvent(
110
                name='kytos/of_core.handshake.completed',
111
                content={'switch': switch})
112 1
            self.controller.buffers.app.put(event_raw)
113
114 1
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
115
    def handle_multipart_reply(self, event):
116
        """Handle multipart replies for v0x04 switches.
117
118
        Args:
119
            event (:class:`~kytos.core.events.KytosEvent):
120
                Event with ofpt_multipart_reply in message.
121
        """
122 1
        reply = event.content['message']
123 1
        switch = event.source.switch
124
125 1
        if reply.multipart_type == MultipartType.OFPMP_FLOW:
126 1
            self._handle_multipart_flow_stats(reply, switch)
127 1
        elif reply.multipart_type == MultipartType.OFPMP_PORT_DESC:
128 1
            of_core_v0x04_utils.handle_port_desc(self.controller, switch,
129
                                                 reply.body)
130 1
        elif reply.multipart_type == MultipartType.OFPMP_DESC:
131 1
            switch.update_description(reply.body)
132
133 1
    def _handle_multipart_flow_stats(self, reply, switch):
134
        """Update switch flows after all replies are received."""
135 1
        if self._is_multipart_reply_ours(reply, switch):
136
            # Get all flows from the reply
137 1
            flows = [Flow04.from_of_flow_stats(of_flow_stats, switch)
138
                     for of_flow_stats in reply.body]
139
            # Get existent flows from the same xid (or create an empty list)
140 1
            all_flows = self._multipart_replies_flows.setdefault(switch.id, [])
141 1
            all_flows.extend(flows)
142 1
            if reply.flags.value % 2 == 0:  # Last bit means more replies
143 1
                self._update_switch_flows(switch)
144
145 1
    def _update_switch_flows(self, switch):
146
        """Update controllers' switch flow list and clean resources."""
147 1
        switch.flows = self._multipart_replies_flows[switch.id]
148 1
        del self._multipart_replies_flows[switch.id]
149 1
        del self._multipart_replies_xids[switch.id]
150
151 1
    def _is_multipart_reply_ours(self, reply, switch):
152
        """Return whether we are expecting the reply."""
153 1
        if switch.id in self._multipart_replies_xids:
154 1
            sent_xid = self._multipart_replies_xids[switch.id]
155 1
            if sent_xid == reply.header.xid:
156 1
                return True
157 1
        return False
158
159 1
    @listen_to('kytos/core.openflow.raw.in')
160
    def handle_raw_in(self, event):
161
        """Handle a RawEvent and generate a kytos/core.messages.in.* event.
162
163
        Args:
164
            event (KytosEvent): RawEvent with openflow message to be unpacked
165
        """
166
        # If the switch is already known to the controller, update the
167
        # 'lastseen' attribute
168 1
        switch = event.source.switch
169 1
        if switch:
170 1
            switch.update_lastseen()
171
172 1
        connection = event.source
173
174 1
        data = connection.remaining_data + event.content['new_data']
175 1
        packets, connection.remaining_data = of_slicer(data)
176 1
        if not packets:
177
            return
178
179 1
        unprocessed_packets = []
180
181 1
        for packet in packets:
182 1
            if not connection.is_alive():
183
                return
184 1
            log.debug('Connection %s: New Raw Openflow packet - %s',
185
                      connection.id, packet.hex())
186
187 1
            if connection.is_new():
188 1
                try:
189 1
                    message = GenericHello(packet=packet)
190 1
                    self._negotiate(connection, message)
191 1
                except (UnpackException, NegotiationException) as err:
192 1
                    if isinstance(err, UnpackException):
193
                        log.error('Connection %s: Invalid hello message',
194
                                  connection.id)
195
                    else:
196 1
                        log.error('Connection %s: Negotiation Failed',
197
                                  connection.id)
198 1
                    connection.protocol.state = 'hello_failed'
199 1
                    connection.close()
200 1
                    connection.state = ConnectionState.FAILED
201 1
                    return
202 1
                connection.set_setup_state()
203 1
                continue
204
205 1
            try:
206 1
                message = connection.protocol.unpack(packet)
207 1
                if message.header.message_type == Type.OFPT_ERROR:
208
                    self.handle_error_message(connection, message)
209
210 1
            except (UnpackException, AttributeError) as err:
211 1
                log.error(err)
212 1
                if isinstance(err, AttributeError):
213 1
                    error_msg = 'connection closed before version negotiation'
214 1
                    log.error('Connection %s: %s', connection.id, error_msg)
215 1
                connection.close()
216 1
                return
217
218 1
            log.debug('Connection %s: IN OFP, version: %s, type: %s, xid: %s',
219
                      connection.id,
220
                      message.header.version,
221
                      message.header.message_type,
222
                      message.header.xid)
223
224 1
            waiting_features_reply = (
225
                str(message.header.message_type) == 'Type.OFPT_FEATURES_REPLY'
226
                and connection.protocol.state == 'waiting_features_reply')
227
228 1
            if connection.is_during_setup() and not waiting_features_reply:
229
                unprocessed_packets.append(packet)
230
                continue
231
232 1
            self.emit_message_in(connection, message)
233
234 1
        connection.remaining_data = b''.join(unprocessed_packets) + \
235
                                    connection.remaining_data
236
237 1
    def emit_message_in(self, connection, message):
238
        """Emit a KytosEvent for each incoming message.
239
240
        Also update links and port status.
241
        """
242 1
        if not connection.is_alive():
243
            return
244 1
        emit_message_in(self.controller, connection, message)
245 1
        msg_type = message.header.message_type.name.lower()
246 1
        if msg_type == 'ofpt_port_status':
247 1
            self.update_port_status(message, connection)
248 1
        elif msg_type == 'ofpt_packet_in':
249 1
            self.update_links(message, connection)
250
251 1
    def emit_message_out(self, connection, message):
252
        """Emit a KytosEvent for each outgoing message."""
253 1
        if connection.is_alive():
254 1
            emit_message_out(self.controller, connection, message)
255
256 1
    @staticmethod
257
    def handle_error_message(connection, message):
258
        """Handle error messages.
259
260
        This method will get an error message, display the code and if
261
        necessary, read the error package and deal with it.
262
        """
263
        log.error(f"OFPT_ERROR: {str(message.code)}")
264
265
        switch = connection.switch
266
        error_data = message.data.pack()
267
        # Get the packet responsible for the error
268
        error_packet = connection.protocol.unpack(error_data)
269
270
        if (message.code == BadActionCode01.OFPBAC_BAD_OUT_PORT or
271
           message.code == BadActionCode04.OFPBAC_BAD_OUT_PORT):
272
273
            for action in error_packet.actions:
274
                try:
275
                    iface = switch.get_interface_by_port_no(action.port.value)
276
                except AttributeError:
277
                    iface = switch.get_interface_by_port_no(action.port)
278
279
                # Check interface to drop packets forwarded to it
280
                if iface:
281
                    if connection.protocol.version == 0x01:
282
                        iface.config = PortConfig01.OFPPC_NO_FWD
283
                    elif connection.protocol.version == 0x04:
284
                        iface.config = PortConfig04.OFPPC_NO_FWD
285
286 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_echo_request')
287
    def handle_echo_request(self, event):
288
        """Handle Echo Request Messages.
289
290
        This method will get a echo request sent by client and generate a
291
        echo reply as answer.
292
293
        Args:
294
            event (:class:`~kytos.core.events.KytosEvent`):
295
                Event with echo request in message.
296
297
        """
298 1
        pyof_lib = PYOF_VERSION_LIBS[event.source.protocol.version]
299 1
        echo_request = event.message
300 1
        echo_reply = pyof_lib.symmetric.echo_reply.EchoReply(
301
            xid=echo_request.header.xid,
302
            data=echo_request.data)
303 1
        self.emit_message_out(event.source, echo_reply)
304
305 1
    def _negotiate(self, connection, message):
306
        """Handle hello messages.
307
308
        This method will handle the incoming hello message by client
309
        and deal with negotiation.
310
311
        Parameters:
312
            event (KytosMessageInHello): KytosMessageInHelloEvent
313
314
        """
315 1
        if message.versions:
316 1
            version = _get_version_from_bitmask(message.versions)
317
        else:
318 1
            version = _get_version_from_header(message.header.version)
319
320 1
        log.debug('connection %s: negotiated version - %s',
321
                  connection.id, str(version))
322
323 1
        if version is None:
324 1
            self.fail_negotiation(connection, message)
325 1
            raise NegotiationException()
326
327 1
        version_utils = self.of_core_version_utils[version]
328 1
        version_utils.say_hello(self.controller, connection)
329
330 1
        connection.protocol.name = 'openflow'
331 1
        connection.protocol.version = version
332 1
        connection.protocol.unpack = unpack
333 1
        connection.protocol.state = 'sending_features'
334 1
        self.send_features_request(connection)
335 1
        log.debug('Connection %s: Hello complete', connection.id)
336
337 1
    def fail_negotiation(self, connection, hello_message):
338
        """Send Error message and emit event upon negotiation failure."""
339 1
        log.warning('connection %s: version negotiation failed',
340
                    connection.id)
341 1
        connection.protocol.state = 'hello_failed'
342 1
        event_raw = KytosEvent(
343
            name='kytos/of_core.hello_failed',
344
            content={'source': connection})
345 1
        self.controller.buffers.app.put(event_raw)
346
347 1
        version = max(settings.OPENFLOW_VERSIONS)
348 1
        pyof_lib = PYOF_VERSION_LIBS[version]
349
350 1
        error_message = pyof_lib.asynchronous.error_msg.ErrorMsg(
351
            xid=hello_message.header.xid,
352
            error_type=pyof_lib.asynchronous.error_msg.
353
            ErrorType.OFPET_HELLO_FAILED,
354
            code=pyof_lib.asynchronous.error_msg.HelloFailedCode.
355
            OFPHFC_INCOMPATIBLE)
356 1
        self.emit_message_out(connection, error_message)
357
358
    # May be removed
359 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_echo_reply')
360
    def handle_queued_openflow_echo_reply(self, event):
361
        """Handle queued OpenFlow echo reply messages.
362
363
        Send a feature request message if SEND_FEATURES_REQUEST_ON_ECHO
364
        is True (default is False).
365
        """
366 1
        if settings.SEND_FEATURES_REQUEST_ON_ECHO:
367 1
            self.send_features_request(event.destination)
368
369 1
    def send_features_request(self, destination):
370
        """Send a feature request to the switch."""
371 1
        version = destination.protocol.version
372 1
        pyof_lib = PYOF_VERSION_LIBS[version]
373 1
        features_request = pyof_lib.controller2switch.\
374
            features_request.FeaturesRequest()
375 1
        self.emit_message_out(destination, features_request)
376
377
    # pylint: disable=no-self-use
378 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_features_request')
379
    def handle_features_request_sent(self, event):
380
        """Ensure request has actually been sent before changing state."""
381 1
        if event.destination.protocol.state == 'sending_features':
382 1
            event.destination.protocol.state = 'waiting_features_reply'
383
    # pylint: enable=no-self-use
384
385 1
    @staticmethod
386 1
    @listen_to('kytos/of_core.v0x[0-9a-f]{2}.messages.in.hello_failed',
387
               'kytos/of_core.v0x0[14].messages.out.hello_failed')
388
    def handle_openflow_in_hello_failed(event):
389
        """Close the connection upon hello failure."""
390 1
        event.destination.close()
391 1
        log.debug("Connection %s: Connection closed.", event.destination.id)
392
393 1
    def shutdown(self):
394
        """End of the application."""
395 1
        log.debug('Shutting down...')
396
397 1
    def update_links(self, message, source):
398
        """Dispatch 'reacheable.mac' event.
399
400
        Args:
401
            message: python openflow (pyof) PacketIn object.
402
            source: kytos.core.switch.Connection instance.
403
404
        Dispatch:
405
            `reachable.mac`:
406
                {
407
                  switch : <switch.id>,
408
                  port: <port.port_no>
409
                  reachable_mac: <mac_address>
410
                }
411
412
        """
413 1
        ethernet = Ethernet()
414 1
        ethernet.unpack(message.data.value)
415 1
        if ethernet.ether_type in (EtherType.LLDP, EtherType.IPV6):
416
            return
417
418 1
        try:
419 1
            port = source.switch.get_interface_by_port_no(
420
                message.in_port.value)
421 1
        except AttributeError:
422 1
            port = source.switch.get_interface_by_port_no(message.in_port)
423
424 1
        name = 'kytos/of_core.reachable.mac'
425 1
        content = {'switch': source.switch,
426
                   'port': port,
427
                   'reachable_mac': ethernet.source.value}
428 1
        event = KytosEvent(name, content)
429 1
        self.controller.buffers.app.put(event)
430
431 1
        msg = 'The MAC %s is reachable from switch/port %s/%s.'
432 1
        log.debug(msg, ethernet.source, source.switch.id,
433
                  message.in_port)
434
435 1
    def _send_specific_port_mod(self, port, interface, current_state):
436
        """Dispatch port link_up/link_down events."""
437 1
        event_name = 'kytos/of_core.switch.interface.'
438 1
        event_content = {'interface': interface}
439
440 1
        if port.state.value % 2:
441 1
            status = 'link_down'
442
        else:
443 1
            status = 'link_up'
444
445 1
        if current_state:
446 1
            if current_state % 2:
447 1
                current_status = 'link_down'
448
            else:
449 1
                current_status = 'link_up'
450
        else:
451 1
            current_status = None
452
453 1
        if status != current_status:
454 1
            event = KytosEvent(name=event_name+status, content=event_content)
455 1
            self.controller.buffers.app.put(event)
456
457 1
    def update_port_status(self, port_status, source):
458
        """Dispatch 'port.*' events.
459
460
        Current events:
461
462
        created|deleted|link_up|link_down|modified
463
464
        Args:
465
            port_status: python openflow (pyof) PortStatus object.
466
            source: kytos.core.switch.Connection instance.
467
468
        Dispatch:
469
            `kytos/of_core.switch.port.[created|modified|deleted]`:
470
                {
471
                  switch : <switch.id>,
472
                  port: <port.port_no>
473
                  port_description: {<description of the port>}
474
                }
475
476
        """
477 1
        reason = port_status.reason.enum_ref(port_status.reason.value).name
478 1
        port = port_status.desc
479 1
        port_no = port.port_no.value
480 1
        event_name = 'kytos/of_core.switch.interface.'
481
482 1
        if reason == 'OFPPR_ADD':
483 1
            status = 'created'
484 1
            interface = Interface(name=port.name.value,
485
                                  address=port.hw_addr.value,
486
                                  port_number=port_no,
487
                                  switch=source.switch,
488
                                  state=port.state.value,
489
                                  features=port.curr)
490 1
            source.switch.update_interface(interface)
491
492 1
        elif reason == 'OFPPR_MODIFY':
493 1
            status = 'modified'
494 1
            interface = source.switch.get_interface_by_port_no(port_no)
495 1
            current_status = None
496 1
            if interface:
497 1
                log.info('Modified %s %s:%s' %
498
                         (interface, interface.switch.dpid,
499
                          interface.port_number))
500 1
                current_status = interface.state
501 1
                interface.state = port.state.value
502 1
                interface.name = port.name.value
503 1
                interface.address = port.hw_addr.value
504 1
                interface.features = port.curr
505
            else:
506 1
                interface = Interface(name=port.name.value,
507
                                      address=port.hw_addr.value,
508
                                      port_number=port_no,
509
                                      switch=source.switch,
510
                                      state=port.state.value,
511
                                      features=port.curr)
512 1
            source.switch.update_interface(interface)
513 1
            self._send_specific_port_mod(port, interface, current_status)
514
515 1
        elif reason == 'OFPPR_DELETE':
516 1
            status = 'deleted'
517 1
            interface = source.switch.get_interface_by_port_no(port_no)
518 1
            source.switch.remove_interface(interface)
519
520 1
        event_name += status
0 ignored issues
show
introduced by
The variable status does not seem to be defined for all execution paths.
Loading history...
521 1
        content = {'interface': interface}
0 ignored issues
show
introduced by
The variable interface does not seem to be defined for all execution paths.
Loading history...
522
523 1
        event = KytosEvent(name=event_name, content=content)
524 1
        self.controller.buffers.app.put(event)
525
526 1
        msg = 'The port %s from switch %s was %s.'
527 1
        log.debug(msg, port_status.desc.port_no, source.switch.id, status)
528
529
530 1
def _get_version_from_bitmask(message_versions):
531
    """Get common version from hello message version bitmap."""
532
    try:
533
        return max([version for version in message_versions
534
                    if version in settings.OPENFLOW_VERSIONS])
535
    except ValueError:
536
        return None
537
538
539 1
def _get_version_from_header(message_version):
540
    """Get common version from hello message header version."""
541
    version = min(message_version, max(settings.OPENFLOW_VERSIONS))
542
    return version if version in settings.OPENFLOW_VERSIONS else None
543