Passed
Push — master ( 667973...d237fc )
by Beraldo
01:46
created

build.main.Main.handle_raw_in()   F

Complexity

Conditions 14

Size

Total Lines 75
Code Lines 57

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 210

Importance

Changes 0
Metric Value
cc 14
eloc 57
nop 2
dl 0
loc 75
ccs 0
cts 46
cp 0
crap 210
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.handle_raw_in() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""NApp responsible for the main OpenFlow basic operations."""
2
3
from pyof.foundation.exceptions import UnpackException
4
from pyof.foundation.network_types import Ethernet, EtherType
5
from pyof.utils import PYOF_VERSION_LIBS, unpack
6
from pyof.v0x01.common.header import Type
7
from pyof.v0x01.controller2switch.common import StatsType
8
from pyof.v0x04.controller2switch.common import MultipartType
9
10
from kytos.core import KytosEvent, KytosNApp, log
11
from kytos.core.connection import ConnectionState
12
from kytos.core.helpers import listen_to
13
from kytos.core.interface import Interface
14
from napps.kytos.of_core import settings
15
from napps.kytos.of_core.utils import (GenericHello, NegotiationException,
16
                                       emit_message_in, emit_message_out,
17
                                       of_slicer)
18
from napps.kytos.of_core.v0x01 import utils as of_core_v0x01_utils
19
from napps.kytos.of_core.v0x01.flow import Flow as Flow01
20
from napps.kytos.of_core.v0x04 import utils as of_core_v0x04_utils
21
from napps.kytos.of_core.v0x04.flow import Flow as Flow04
22
23
24
class Main(KytosNApp):
25
    """Main class of the NApp responsible for OpenFlow basic operations."""
26
27
    # Keep track of multiple multipart replies from our own request only.
28
    # Assume that all replies are received before setting a new xid.
29
    _multipart_replies_xids = {}
30
    _multipart_replies_flows = {}
31
32
    def setup(self):
33
        """App initialization (used instead of ``__init__``).
34
35
        The setup method is automatically called by the run method.
36
        Users shouldn't call this method directly.
37
        """
38
        self.of_core_version_utils = {0x01: of_core_v0x01_utils,
39
                                      0x04: of_core_v0x04_utils}
40
        self.execute_as_loop(settings.STATS_INTERVAL)
41
42
    def execute(self):
43
        """Run once on app 'start' or in a loop.
44
45
        The execute method is called by the run method of KytosNApp class.
46
        Users shouldn't call this method directly.
47
        """
48
        for switch in self.controller.switches.values():
49
            if switch.is_connected():
50
                self._request_flow_list(switch)
51
                if settings.SEND_ECHO_REQUESTS:
52
                    version_utils = \
53
                        self.of_core_version_utils[switch.
54
                                                   connection.protocol.version]
55
                    version_utils.send_echo(self.controller, switch)
56
57
    def _request_flow_list(self, switch):
58
        """Send flow stats request to a connected switch."""
59
        of_version = switch.connection.protocol.version
60
        if of_version == 0x01:
61
            of_core_v0x01_utils.update_flow_list(self.controller, switch)
62
        elif of_version == 0x04:
63
            xid = of_core_v0x04_utils.update_flow_list(self.controller, switch)
64
            self._multipart_replies_xids[switch.id] = xid
65
66
    @staticmethod
67
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
68
    def handle_stats_reply(event):
69
        """Handle stats replies for v0x01 switches.
70
71
        Args:
72
            event (:class:`~kytos.core.events.KytosEvent):
73
                Event with ofpt_stats_reply in message.
74
        """
75
        switch = event.source.switch
76
        msg = event.content['message']
77
        if msg.body_type == StatsType.OFPST_FLOW:
78
            switch.flows = [Flow01.from_of_flow_stats(f, switch)
79
                            for f in msg.body]
80
        elif msg.body_type == StatsType.OFPST_DESC:
81
            switch.update_description(msg.body)
82
83
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_features_reply')
84
    def handle_features_reply(self, event):
85
        """Handle kytos/of_core.messages.in.ofpt_features_reply event.
86
87
        This is the end of the Handshake workflow of the OpenFlow Protocol.
88
89
        Args:
90
            event (KytosEvent): Event with features reply message.
91
        """
92
        connection = event.source
93
        version_utils = self.of_core_version_utils[connection.protocol.version]
94
        switch = version_utils.handle_features_reply(self.controller, event)
95
96
        if (connection.is_during_setup() and
97
                connection.protocol.state == 'waiting_features_reply'):
98
            connection.protocol.state = 'handshake_complete'
99
            connection.set_established_state()
100
            version_utils.send_desc_request(self.controller, switch)
101
            if settings.SEND_SET_CONFIG:
102
                version_utils.send_set_config(self.controller, switch)
103
            log.info('Connection %s, Switch %s: OPENFLOW HANDSHAKE COMPLETE',
104
                     connection.id, switch.dpid)
105
            event_raw = KytosEvent(
106
                name='kytos/of_core.handshake.completed',
107
                content={'switch': switch})
108
            self.controller.buffers.app.put(event_raw)
109
110
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
111
    def handle_multipart_reply(self, event):
112
        """Handle multipart replies for v0x04 switches.
113
114
        Args:
115
            event (:class:`~kytos.core.events.KytosEvent):
116
                Event with ofpt_multipart_reply in message.
117
        """
118
        reply = event.content['message']
119
        switch = event.source.switch
120
121
        if reply.multipart_type == MultipartType.OFPMP_FLOW:
122
            self._handle_multipart_flow_stats(reply, switch)
123
        elif reply.multipart_type == MultipartType.OFPMP_PORT_DESC:
124
            of_core_v0x04_utils.handle_port_desc(self.controller, switch,
125
                                                 reply.body)
126
        elif reply.multipart_type == MultipartType.OFPMP_DESC:
127
            switch.update_description(reply.body)
128
129
    def _handle_multipart_flow_stats(self, reply, switch):
130
        """Update switch flows after all replies are received."""
131
        if self._is_multipart_reply_ours(reply, switch):
132
            # Get all flows from the reply
133
            flows = [Flow04.from_of_flow_stats(of_flow_stats, switch)
134
                     for of_flow_stats in reply.body]
135
            # Get existent flows from the same xid (or create an empty list)
136
            all_flows = self._multipart_replies_flows.setdefault(switch.id, [])
137
            all_flows.extend(flows)
138
            if reply.flags.value % 2 == 0:  # Last bit means more replies
139
                self._update_switch_flows(switch)
140
141
    def _update_switch_flows(self, switch):
142
        """Update controllers' switch flow list and clean resources."""
143
        switch.flows = self._multipart_replies_flows[switch.id]
144
        del self._multipart_replies_flows[switch.id]
145
        del self._multipart_replies_xids[switch.id]
146
147
    def _is_multipart_reply_ours(self, reply, switch):
148
        """Return whether we are expecting the reply."""
149
        if switch.id in self._multipart_replies_xids:
150
            sent_xid = self._multipart_replies_xids[switch.id]
151
            if sent_xid == reply.header.xid:
152
                return True
153
        return False
154
155
    @listen_to('kytos/core.openflow.raw.in')
156
    def handle_raw_in(self, event):
157
        """Handle a RawEvent and generate a kytos/core.messages.in.* event.
158
159
        Args:
160
            event (KytosEvent): RawEvent with openflow message to be unpacked
161
        """
162
        # If the switch is already known to the controller, update the
163
        # 'lastseen' attribute
164
        switch = event.source.switch
165
        if switch:
166
            switch.update_lastseen()
167
168
        connection = event.source
169
170
        data = connection.remaining_data + event.content['new_data']
171
        packets, connection.remaining_data = of_slicer(data)
172
        if not packets:
173
            return
174
175
        unprocessed_packets = []
176
177
        for packet in packets:
178
            if not connection.is_alive():
179
                return
180
            log.debug('Connection %s: New Raw Openflow packet - %s',
181
                      connection.id, packet.hex())
182
183
            if connection.is_new():
184
                try:
185
                    message = GenericHello(packet=packet)
186
                    self._negotiate(connection, message)
187
                except (UnpackException, NegotiationException) as err:
188
                    if isinstance(err, UnpackException):
189
                        log.debug('Connection %s: Invalid hello message',
190
                                  connection.id)
191
                    else:
192
                        log.debug('Connection %s: Negotiation Failed',
193
                                  connection.id)
194
                    connection.protocol.state = 'hello_failed'
195
                    connection.close()
196
                    connection.state = ConnectionState.FAILED
197
                    return
198
                connection.set_setup_state()
199
                continue
200
201
            try:
202
                message = connection.protocol.unpack(packet)
203
                if message.header.message_type == Type.OFPT_ERROR:
204
                    log.error(f"OFPT_ERROR: {str(message.code)}")
205
            except (UnpackException, AttributeError) as err:
206
                log.debug(err)
207
                if isinstance(err, AttributeError):
208
                    debug_msg = 'connection closed before version negotiation'
209
                    log.debug('Connection %s: %s', connection.id, debug_msg)
210
                connection.close()
211
                return
212
213
            log.debug('Connection %s: IN OFP, version: %s, type: %s, xid: %s',
214
                      connection.id,
215
                      message.header.version,
216
                      message.header.message_type,
217
                      message.header.xid)
218
219
            if connection.is_during_setup():
220
                if not (str(message.header.message_type) ==
221
                        'Type.OFPT_FEATURES_REPLY' and
222
                        connection.protocol.state == 'waiting_features_reply'):
223
                    unprocessed_packets.append(packet)
224
                    continue
225
226
            self.emit_message_in(connection, message)
227
228
        connection.remaining_data = b''.join(unprocessed_packets) + \
229
                                    connection.remaining_data
230
231
    def emit_message_in(self, connection, message):
232
        """Emit a KytosEvent for each incoming message.
233
234
        Also update links and port status.
235
        """
236
        if not connection.is_alive():
237
            return
238
        emit_message_in(self.controller, connection, message)
239
        msg_type = message.header.message_type.name.lower()
240
        if msg_type == 'ofpt_port_status':
241
            self.update_port_status(message, connection)
242
        elif msg_type == 'ofpt_packet_in':
243
            self.update_links(message, connection)
244
245
    def emit_message_out(self, connection, message):
246
        """Emit a KytosEvent for each outgoing message."""
247
        if connection.is_alive():
248
            emit_message_out(self.controller, connection, message)
249
250
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_echo_request')
251
    def handle_echo_request(self, event):
252
        """Handle Echo Request Messages.
253
254
        This method will get a echo request sent by client and generate a
255
        echo reply as answer.
256
257
        Args:
258
            event (:class:`~kytos.core.events.KytosEvent`):
259
                Event with echo request in message.
260
261
        """
262
        pyof_lib = PYOF_VERSION_LIBS[event.source.protocol.version]
263
        echo_request = event.message
264
        echo_reply = pyof_lib.symmetric.echo_reply.EchoReply(
265
            xid=echo_request.header.xid,
266
            data=echo_request.data)
267
        self.emit_message_out(event.source, echo_reply)
268
269
    def _negotiate(self, connection, message):
270
        """Handle hello messages.
271
272
        This method will handle the incoming hello message by client
273
        and deal with negotiation.
274
275
        Parameters:
276
            event (KytosMessageInHello): KytosMessageInHelloEvent
277
278
        """
279
        if message.versions:
280
            version = _get_version_from_bitmask(message.versions)
281
        else:
282
            version = _get_version_from_header(message.header.version)
283
284
        log.debug('connection %s: negotiated version - %s',
285
                  connection.id, str(version))
286
287
        if version is None:
288
            self.fail_negotiation(connection, message)
289
            raise NegotiationException()
290
291
        version_utils = self.of_core_version_utils[version]
292
        version_utils.say_hello(self.controller, connection)
293
294
        connection.protocol.name = 'openflow'
295
        connection.protocol.version = version
296
        connection.protocol.unpack = unpack
297
        connection.protocol.state = 'sending_features'
298
        self.send_features_request(connection)
299
        log.debug('Connection %s: Hello complete', connection.id)
300
301
    def fail_negotiation(self, connection, hello_message):
302
        """Send Error message and emit event upon negotiation failure."""
303
        log.warning('connection %s: version negotiation failed',
304
                    connection.id)
305
        connection.protocol.state = 'hello_failed'
306
        event_raw = KytosEvent(
307
            name='kytos/of_core.hello_failed',
308
            content={'source': connection})
309
        self.controller.buffers.app.put(event_raw)
310
311
        version = max(settings.OPENFLOW_VERSIONS)
312
        pyof_lib = PYOF_VERSION_LIBS[version]
313
314
        error_message = pyof_lib.asynchronous.error_msg.ErrorMsg(
315
            xid=hello_message.header.xid,
316
            error_type=pyof_lib.asynchronous.error_msg.
317
            ErrorType.OFPET_HELLO_FAILED,
318
            code=pyof_lib.asynchronous.error_msg.HelloFailedCode.
319
            OFPHFC_INCOMPATIBLE)
320
        self.emit_message_out(connection, error_message)
321
322
    # May be removed
323
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_echo_reply')
324
    def handle_queued_openflow_echo_reply(self, event):
325
        """Handle queued OpenFlow echo reply messages.
326
327
        Send a feature request message if SEND_FEATURES_REQUEST_ON_ECHO
328
        is True (default is False).
329
        """
330
        if settings.SEND_FEATURES_REQUEST_ON_ECHO:
331
            self.send_features_request(event.destination)
332
333
    def send_features_request(self, destination):
334
        """Send a feature request to the switch."""
335
        version = destination.protocol.version
336
        pyof_lib = PYOF_VERSION_LIBS[version]
337
        features_request = pyof_lib.controller2switch.\
338
            features_request.FeaturesRequest()
339
        self.emit_message_out(destination, features_request)
340
341
    # pylint: disable=no-self-use
342
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_features_request')
343
    def handle_features_request_sent(self, event):
344
        """Ensure request has actually been sent before changing state."""
345
        if event.destination.protocol.state == 'sending_features':
346
            event.destination.protocol.state = 'waiting_features_reply'
347
    # pylint: enable=no-self-use
348
349
    @staticmethod
350
    @listen_to('kytos/of_core.v0x[0-9a-f]{2}.messages.in.hello_failed',
351
               'kytos/of_core.v0x0[14].messages.out.hello_failed')
352
    def handle_openflow_in_hello_failed(event):
353
        """Close the connection upon hello failure."""
354
        event.destination.close()
355
        log.debug("Connection %s: Connection closed.", event.destination.id)
356
357
    def shutdown(self):
358
        """End of the application."""
359
        log.debug('Shutting down...')
360
361
    def update_links(self, message, source):
362
        """Dispatch 'reacheable.mac' event.
363
364
        Args:
365
            message: python openflow (pyof) PacketIn object.
366
            source: kytos.core.switch.Connection instance.
367
368
        Dispatch:
369
            `reachable.mac`:
370
                {
371
                  switch : <switch.id>,
372
                  port: <port.port_no>
373
                  reachable_mac: <mac_address>
374
                }
375
376
        """
377
        ethernet = Ethernet()
378
        ethernet.unpack(message.data.value)
379
        if ethernet.ether_type in (EtherType.LLDP, EtherType.IPV6):
380
            return
381
382
        try:
383
            port = source.switch.get_interface_by_port_no(
384
                message.in_port.value)
385
        except AttributeError:
386
            port = source.switch.get_interface_by_port_no(message.in_port)
387
388
        name = 'kytos/of_core.reachable.mac'
389
        content = {'switch': source.switch,
390
                   'port': port,
391
                   'reachable_mac': ethernet.source.value}
392
        event = KytosEvent(name, content)
393
        self.controller.buffers.app.put(event)
394
395
        msg = 'The MAC %s is reachable from switch/port %s/%s.'
396
        log.debug(msg, ethernet.source, source.switch.id,
397
                  message.in_port)
398
399
    def _send_specific_port_mod(self, port, interface, current_state):
400
        """Dispatch port link_up/link_down events."""
401
        event_name = 'kytos/of_core.switch.interface.'
402
        event_content = {'interface': interface}
403
404
        if port.state.value % 2:
405
            status = 'link_down'
406
        else:
407
            status = 'link_up'
408
409
        if current_state:
410
            if current_state % 2:
411
                current_status = 'link_down'
412
            else:
413
                current_status = 'link_up'
414
        else:
415
            current_status = None
416
417
        if status != current_status:
418
            event = KytosEvent(name=event_name+status, content=event_content)
419
            self.controller.buffers.app.put(event)
420
421
    def update_port_status(self, port_status, source):
422
        """Dispatch 'port.*' events.
423
424
        Current events:
425
426
        created|deleted|link_up|link_down|modified
427
428
        Args:
429
            port_status: python openflow (pyof) PortStatus object.
430
            source: kytos.core.switch.Connection instance.
431
432
        Dispatch:
433
            `kytos/of_core.switch.port.[created|modified|deleted]`:
434
                {
435
                  switch : <switch.id>,
436
                  port: <port.port_no>
437
                  port_description: {<description of the port>}
438
                }
439
440
        """
441
        reason = port_status.reason.enum_ref(port_status.reason.value).name
442
        port = port_status.desc
443
        port_no = port.port_no.value
444
        event_name = 'kytos/of_core.switch.interface.'
445
446
        if reason == 'OFPPR_ADD':
447
            status = 'created'
448
            interface = Interface(name=port.name.value,
449
                                  address=port.hw_addr.value,
450
                                  port_number=port_no,
451
                                  switch=source.switch,
452
                                  state=port.state.value,
453
                                  features=port.curr)
454
            source.switch.update_interface(interface)
455
456
        elif reason == 'OFPPR_MODIFY':
457
            status = 'modified'
458
            interface = source.switch.get_interface_by_port_no(port_no)
459
            current_status = None
460
            if interface:
461
                log.info('Modified %s %s:%s' %
462
                         (interface, interface.switch.dpid,
463
                          interface.port_number))
464
                current_status = interface.state
465
                interface.state = port.state.value
466
                interface.name = port.name.value
467
                interface.address = port.hw_addr.value
468
                interface.features = port.curr
469
            else:
470
                interface = Interface(name=port.name.value,
471
                                      address=port.hw_addr.value,
472
                                      port_number=port_no,
473
                                      switch=source.switch,
474
                                      state=port.state.value,
475
                                      features=port.curr)
476
            source.switch.update_interface(interface)
477
            self._send_specific_port_mod(port, interface, current_status)
478
479
        elif reason == 'OFPPR_DELETE':
480
            status = 'deleted'
481
            interface = source.switch.get_interface_by_port_no(port_no)
482
            source.switch.remove_interface(interface)
483
484
        event_name += status
0 ignored issues
show
introduced by
The variable status does not seem to be defined for all execution paths.
Loading history...
485
        content = {'interface': interface}
0 ignored issues
show
introduced by
The variable interface does not seem to be defined for all execution paths.
Loading history...
486
487
        event = KytosEvent(name=event_name, content=content)
488
        self.controller.buffers.app.put(event)
489
490
        msg = 'The port %s from switch %s was %s.'
491
        log.debug(msg, port_status.desc.port_no, source.switch.id, status)
492
493
494
def _get_version_from_bitmask(message_versions):
495
    """Get common version from hello message version bitmap."""
496
    try:
497
        return max([version for version in message_versions
498
                    if version in settings.OPENFLOW_VERSIONS])
499
    except ValueError:
500
        return None
501
502
503
def _get_version_from_header(message_version):
504
    """Get common version from hello message header version."""
505
    version = min(message_version, max(settings.OPENFLOW_VERSIONS))
506
    return version if version in settings.OPENFLOW_VERSIONS else None
507