Passed
Push — master ( e2f2a2...449752 )
by Humberto
03:29
created

build.main.Main.handle_multipart_reply()   A

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5.0187

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 20
rs 9.2833
c 0
b 0
f 0
ccs 10
cts 11
cp 0.9091
cc 5
nop 2
crap 5.0187
1
"""NApp responsible for the main OpenFlow basic operations."""
2
3 1
from pyof.foundation.exceptions import UnpackException
4 1
from pyof.foundation.network_types import Ethernet, EtherType
5 1
from pyof.utils import PYOF_VERSION_LIBS, unpack
6 1
from pyof.v0x01.common.header import Type
7 1
from pyof.v0x01.controller2switch.common import StatsType
8 1
from pyof.v0x04.controller2switch.common import MultipartType
9
10 1
from kytos.core import KytosEvent, KytosNApp, log
11 1
from kytos.core.connection import ConnectionState
12 1
from kytos.core.helpers import listen_to
13 1
from kytos.core.interface import Interface
14 1
from napps.kytos.of_core import settings
15 1
from napps.kytos.of_core.utils import (GenericHello, NegotiationException,
16
                                       emit_message_in, emit_message_out,
17
                                       of_slicer)
18 1
from napps.kytos.of_core.v0x01 import utils as of_core_v0x01_utils
19 1
from napps.kytos.of_core.v0x01.flow import Flow as Flow01
20 1
from napps.kytos.of_core.v0x04 import utils as of_core_v0x04_utils
21 1
from napps.kytos.of_core.v0x04.flow import Flow as Flow04
22
23
24 1
class Main(KytosNApp):
25
    """Main class of the NApp responsible for OpenFlow basic operations."""
26
27
    # Keep track of multiple multipart replies from our own request only.
28
    # Assume that all replies are received before setting a new xid.
29 1
    _multipart_replies_xids = {}
30 1
    _multipart_replies_flows = {}
31 1
    _multipart_replies_ports = {}
32
33 1
    def setup(self):
34
        """App initialization (used instead of ``__init__``).
35
36
        The setup method is automatically called by the run method.
37
        Users shouldn't call this method directly.
38
        """
39 1
        self.of_core_version_utils = {0x01: of_core_v0x01_utils,
40
                                      0x04: of_core_v0x04_utils}
41 1
        self.execute_as_loop(settings.STATS_INTERVAL)
42
43 1
    def execute(self):
44
        """Run once on app 'start' or in a loop.
45
46
        The execute method is called by the run method of KytosNApp class.
47
        Users shouldn't call this method directly.
48
        """
49 1
        for switch in self.controller.switches.values():
50 1
            if switch.is_connected():
51 1
                self._request_flow_list(switch)
52 1
                if settings.SEND_ECHO_REQUESTS:
53 1
                    version_utils = \
54
                        self.of_core_version_utils[switch.
55
                                                   connection.protocol.version]
56 1
                    version_utils.send_echo(self.controller, switch)
57
58 1
    def _request_flow_list(self, switch):
59
        """Send flow stats request to a connected switch."""
60 1
        of_version = switch.connection.protocol.version
61 1
        if of_version == 0x01:
62 1
            of_core_v0x01_utils.update_flow_list(self.controller, switch)
63 1
            of_core_v0x01_utils.request_port_stats(self.controller, switch)
64 1
        elif of_version == 0x04:
65 1
            xid_flows = of_core_v0x04_utils.update_flow_list(self.controller,
66
                                                             switch)
67 1
            xid_ports = of_core_v0x04_utils.request_port_stats(self.controller,
68
                                                               switch)
69 1
            self._multipart_replies_xids[switch.id] = {'flows': xid_flows,
70
                                                       'ports': xid_ports}
71
72 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
73
    def handle_stats_reply(self, event):
74
        """Handle stats replies for v0x01 switches.
75
76
        Args:
77
            event (:class:`~kytos.core.events.KytosEvent):
78
                Event with ofpt_stats_reply in message.
79
        """
80 1
        switch = event.source.switch
81 1
        msg = event.content['message']
82 1
        if msg.body_type == StatsType.OFPST_FLOW:
83 1
            switch.flows = [Flow01.from_of_flow_stats(f, switch)
84
                            for f in msg.body]
85 1
        elif msg.body_type == StatsType.OFPST_PORT:
86
            port_stats = [of_port_stats for of_port_stats in msg.body]
87
            port_stats_event = KytosEvent(
88
                name=f"kytos/of_core.port_stats",
89
                content={
90
                    'switch': switch,
91
                    'port_stats': port_stats
92
                    })
93
            self.controller.buffers.app.put(port_stats_event)
94 1
        elif msg.body_type == StatsType.OFPST_DESC:
95 1
            switch.update_description(msg.body)
96
97 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_features_reply')
98
    def handle_features_reply(self, event):
99
        """Handle kytos/of_core.messages.in.ofpt_features_reply event.
100
101
        This is the end of the Handshake workflow of the OpenFlow Protocol.
102
103
        Args:
104
            event (KytosEvent): Event with features reply message.
105
        """
106 1
        connection = event.source
107 1
        version_utils = self.of_core_version_utils[connection.protocol.version]
108 1
        switch = version_utils.handle_features_reply(self.controller, event)
109
110 1
        if (connection.is_during_setup() and
111
                connection.protocol.state == 'waiting_features_reply'):
112 1
            connection.protocol.state = 'handshake_complete'
113 1
            connection.set_established_state()
114 1
            version_utils.send_desc_request(self.controller, switch)
115 1
            if settings.SEND_SET_CONFIG:
116 1
                version_utils.send_set_config(self.controller, switch)
117 1
            log.info('Connection %s, Switch %s: OPENFLOW HANDSHAKE COMPLETE',
118
                     connection.id, switch.dpid)
119 1
            event_raw = KytosEvent(
120
                name='kytos/of_core.handshake.completed',
121
                content={'switch': switch})
122 1
            self.controller.buffers.app.put(event_raw)
123
124 1
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
125
    def handle_multipart_reply(self, event):
126
        """Handle multipart replies for v0x04 switches.
127
128
        Args:
129
            event (:class:`~kytos.core.events.KytosEvent):
130
                Event with ofpt_multipart_reply in message.
131
        """
132 1
        reply = event.content['message']
133 1
        switch = event.source.switch
134
135 1
        if reply.multipart_type == MultipartType.OFPMP_FLOW:
136 1
            self._handle_multipart_flow_stats(reply, switch)
137 1
        elif reply.multipart_type == MultipartType.OFPMP_PORT_STATS:
138
            self._handle_multipart_port_stats(reply, switch)
139 1
        elif reply.multipart_type == MultipartType.OFPMP_PORT_DESC:
140 1
            of_core_v0x04_utils.handle_port_desc(self.controller, switch,
141
                                                 reply.body)
142 1
        elif reply.multipart_type == MultipartType.OFPMP_DESC:
143 1
            switch.update_description(reply.body)
144
145 1
    def _handle_multipart_flow_stats(self, reply, switch):
146
        """Update switch flows after all replies are received."""
147 1
        if self._is_multipart_reply_ours(reply, switch, 'flows'):
148
            # Get all flows from the reply
149 1
            flows = [Flow04.from_of_flow_stats(of_flow_stats, switch)
150
                     for of_flow_stats in reply.body]
151
            # Get existent flows from the same xid (or create an empty list)
152 1
            all_flows = self._multipart_replies_flows.setdefault(switch.id, [])
153 1
            all_flows.extend(flows)
154 1
            if reply.flags.value % 2 == 0:  # Last bit means more replies
155 1
                self._update_switch_flows(switch)
156
157 1
    def _handle_multipart_port_stats(self, reply, switch):
158
        """Emit an event about new port stats."""
159 1
        if self._is_multipart_reply_ours(reply, switch, 'ports'):
160 1
            port_stats = [of_port_stats for of_port_stats in reply.body]
161 1
            all_port_stats = self._multipart_replies_ports.setdefault(
162
                switch.id, []
163
            )
164 1
            all_port_stats.extend(port_stats)
165 1
            if reply.flags.value % 2 == 0:
166 1
                self._new_port_stats(switch)
167
168 1
    def _update_switch_flows(self, switch):
169
        """Update controllers' switch flow list and clean resources."""
170 1
        switch.flows = self._multipart_replies_flows[switch.id]
171 1
        del self._multipart_replies_flows[switch.id]
172 1
        del self._multipart_replies_xids[switch.id]['flows']
173
174 1
    def _new_port_stats(self, switch):
175
        """Send an event with the new port stats and clean resources."""
176
        all_port_stats = self._multipart_replies_ports[switch.id]
177
        del self._multipart_replies_ports[switch.id]
178
        del self._multipart_replies_xids[switch.id]['ports']
179
        port_stats_event = KytosEvent(
180
            name=f"kytos/of_core.port_stats",
181
            content={
182
                'switch': switch,
183
                'port_stats': all_port_stats
184
                })
185
        self.controller.buffers.app.put(port_stats_event)
186
187 1
    def _is_multipart_reply_ours(self, reply, switch, stat):
188
        """Return whether we are expecting the reply."""
189 1
        if switch.id in self._multipart_replies_xids:
190 1
            sent_xid = self._multipart_replies_xids[switch.id].get(stat)
191 1
            if sent_xid == reply.header.xid:
192 1
                return True
193 1
        return False
194
195 1
    @listen_to('kytos/core.openflow.raw.in')
196
    def handle_raw_in(self, event):
197
        """Handle a RawEvent and generate a kytos/core.messages.in.* event.
198
199
        Args:
200
            event (KytosEvent): RawEvent with openflow message to be unpacked
201
        """
202
        # If the switch is already known to the controller, update the
203
        # 'lastseen' attribute
204 1
        switch = event.source.switch
205 1
        if switch:
206 1
            switch.update_lastseen()
207
208 1
        connection = event.source
209
210 1
        data = connection.remaining_data + event.content['new_data']
211 1
        packets, connection.remaining_data = of_slicer(data)
212 1
        if not packets:
213
            return
214
215 1
        unprocessed_packets = []
216
217 1
        for packet in packets:
218 1
            if not connection.is_alive():
219
                return
220
221 1
            if connection.is_new():
222 1
                try:
223 1
                    message = GenericHello(packet=packet)
224 1
                    self._negotiate(connection, message)
225 1
                except (UnpackException, NegotiationException) as err:
226 1
                    if isinstance(err, UnpackException):
227
                        log.error('Connection %s: Invalid hello message',
228
                                  connection.id)
229
                    else:
230 1
                        log.error('Connection %s: Negotiation Failed',
231
                                  connection.id)
232 1
                    connection.protocol.state = 'hello_failed'
233 1
                    connection.close()
234 1
                    connection.state = ConnectionState.FAILED
235 1
                    return
236 1
                connection.set_setup_state()
237 1
                continue
238
239 1
            try:
240 1
                message = connection.protocol.unpack(packet)
241 1
                if message.header.message_type == Type.OFPT_ERROR:
242
                    log.error(f"OFPT_ERROR: {str(message.code)}")
243 1
            except (UnpackException, AttributeError) as err:
244 1
                log.error(err)
245 1
                if isinstance(err, AttributeError):
246 1
                    error_msg = 'connection closed before version negotiation'
247 1
                    log.error('Connection %s: %s', connection.id, error_msg)
248 1
                connection.close()
249 1
                return
250
251 1
            log.debug('Connection %s: IN OFP, version: %s, type: %s, xid: %s',
252
                      connection.id,
253
                      message.header.version,
254
                      message.header.message_type,
255
                      message.header.xid)
256
257 1
            waiting_features_reply = (
258
                str(message.header.message_type) == 'Type.OFPT_FEATURES_REPLY'
259
                and connection.protocol.state == 'waiting_features_reply')
260
261 1
            if connection.is_during_setup() and not waiting_features_reply:
262
                unprocessed_packets.append(packet)
263
                continue
264
265 1
            self.emit_message_in(connection, message)
266
267 1
        connection.remaining_data = b''.join(unprocessed_packets) + \
268
                                    connection.remaining_data
269
270 1
    def emit_message_in(self, connection, message):
271
        """Emit a KytosEvent for each incoming message.
272
273
        Also update links and port status.
274
        """
275 1
        if not connection.is_alive():
276
            return
277 1
        emit_message_in(self.controller, connection, message)
278 1
        msg_type = message.header.message_type.name.lower()
279 1
        if msg_type == 'ofpt_port_status':
280 1
            self.update_port_status(message, connection)
281 1
        elif msg_type == 'ofpt_packet_in':
282 1
            self.update_links(message, connection)
283
284 1
    def emit_message_out(self, connection, message):
285
        """Emit a KytosEvent for each outgoing message."""
286 1
        if connection.is_alive():
287 1
            emit_message_out(self.controller, connection, message)
288
289 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_echo_request')
290
    def handle_echo_request(self, event):
291
        """Handle Echo Request Messages.
292
293
        This method will get a echo request sent by client and generate a
294
        echo reply as answer.
295
296
        Args:
297
            event (:class:`~kytos.core.events.KytosEvent`):
298
                Event with echo request in message.
299
300
        """
301 1
        pyof_lib = PYOF_VERSION_LIBS[event.source.protocol.version]
302 1
        echo_request = event.message
303 1
        echo_reply = pyof_lib.symmetric.echo_reply.EchoReply(
304
            xid=echo_request.header.xid,
305
            data=echo_request.data)
306 1
        self.emit_message_out(event.source, echo_reply)
307
308 1
    def _negotiate(self, connection, message):
309
        """Handle hello messages.
310
311
        This method will handle the incoming hello message by client
312
        and deal with negotiation.
313
314
        Parameters:
315
            event (KytosMessageInHello): KytosMessageInHelloEvent
316
317
        """
318 1
        if message.versions:
319 1
            version = _get_version_from_bitmask(message.versions)
320
        else:
321 1
            version = _get_version_from_header(message.header.version)
322
323 1
        log.debug('connection %s: negotiated version - %s',
324
                  connection.id, str(version))
325
326 1
        if version is None:
327 1
            self.fail_negotiation(connection, message)
328 1
            raise NegotiationException()
329
330 1
        version_utils = self.of_core_version_utils[version]
331 1
        version_utils.say_hello(self.controller, connection)
332
333 1
        connection.protocol.name = 'openflow'
334 1
        connection.protocol.version = version
335 1
        connection.protocol.unpack = unpack
336 1
        connection.protocol.state = 'sending_features'
337 1
        self.send_features_request(connection)
338 1
        log.debug('Connection %s: Hello complete', connection.id)
339
340 1
    def fail_negotiation(self, connection, hello_message):
341
        """Send Error message and emit event upon negotiation failure."""
342 1
        log.warning('connection %s: version negotiation failed',
343
                    connection.id)
344 1
        connection.protocol.state = 'hello_failed'
345 1
        event_raw = KytosEvent(
346
            name='kytos/of_core.hello_failed',
347
            content={'source': connection})
348 1
        self.controller.buffers.app.put(event_raw)
349
350 1
        version = max(settings.OPENFLOW_VERSIONS)
351 1
        pyof_lib = PYOF_VERSION_LIBS[version]
352
353 1
        error_message = pyof_lib.asynchronous.error_msg.ErrorMsg(
354
            xid=hello_message.header.xid,
355
            error_type=pyof_lib.asynchronous.error_msg.
356
            ErrorType.OFPET_HELLO_FAILED,
357
            code=pyof_lib.asynchronous.error_msg.HelloFailedCode.
358
            OFPHFC_INCOMPATIBLE)
359 1
        self.emit_message_out(connection, error_message)
360
361
    # May be removed
362 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_echo_reply')
363
    def handle_queued_openflow_echo_reply(self, event):
364
        """Handle queued OpenFlow echo reply messages.
365
366
        Send a feature request message if SEND_FEATURES_REQUEST_ON_ECHO
367
        is True (default is False).
368
        """
369 1
        if settings.SEND_FEATURES_REQUEST_ON_ECHO:
370 1
            self.send_features_request(event.destination)
371
372 1
    def send_features_request(self, destination):
373
        """Send a feature request to the switch."""
374 1
        version = destination.protocol.version
375 1
        pyof_lib = PYOF_VERSION_LIBS[version]
376 1
        features_request = pyof_lib.controller2switch.\
377
            features_request.FeaturesRequest()
378 1
        self.emit_message_out(destination, features_request)
379
380
    # pylint: disable=no-self-use
381 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_features_request')
382
    def handle_features_request_sent(self, event):
383
        """Ensure request has actually been sent before changing state."""
384 1
        if event.destination.protocol.state == 'sending_features':
385 1
            event.destination.protocol.state = 'waiting_features_reply'
386
    # pylint: enable=no-self-use
387
388 1
    @staticmethod
389 1
    @listen_to('kytos/of_core.v0x[0-9a-f]{2}.messages.in.hello_failed',
390
               'kytos/of_core.v0x0[14].messages.out.hello_failed')
391
    def handle_openflow_in_hello_failed(event):
392
        """Close the connection upon hello failure."""
393 1
        event.destination.close()
394 1
        log.debug("Connection %s: Connection closed.", event.destination.id)
395
396 1
    def shutdown(self):
397
        """End of the application."""
398 1
        log.debug('Shutting down...')
399
400 1
    def update_links(self, message, source):
401
        """Dispatch 'reacheable.mac' event.
402
403
        Args:
404
            message: python openflow (pyof) PacketIn object.
405
            source: kytos.core.switch.Connection instance.
406
407
        Dispatch:
408
            `reachable.mac`:
409
                {
410
                  switch : <switch.id>,
411
                  port: <port.port_no>
412
                  reachable_mac: <mac_address>
413
                }
414
415
        """
416 1
        ethernet = Ethernet()
417 1
        ethernet.unpack(message.data.value)
418 1
        if ethernet.ether_type in (EtherType.LLDP, EtherType.IPV6):
419
            return
420
421 1
        try:
422 1
            port = source.switch.get_interface_by_port_no(
423
                message.in_port.value)
424 1
        except AttributeError:
425 1
            port = source.switch.get_interface_by_port_no(message.in_port)
426
427 1
        name = 'kytos/of_core.reachable.mac'
428 1
        content = {'switch': source.switch,
429
                   'port': port,
430
                   'reachable_mac': ethernet.source.value}
431 1
        event = KytosEvent(name, content)
432 1
        self.controller.buffers.app.put(event)
433
434 1
        msg = 'The MAC %s is reachable from switch/port %s/%s.'
435 1
        log.debug(msg, ethernet.source, source.switch.id,
436
                  message.in_port)
437
438 1
    def _send_specific_port_mod(self, port, interface, current_state):
439
        """Dispatch port link_up/link_down events."""
440 1
        event_name = 'kytos/of_core.switch.interface.'
441 1
        event_content = {'interface': interface}
442
443 1
        if port.state.value % 2:
444 1
            status = 'link_down'
445
        else:
446 1
            status = 'link_up'
447
448 1
        if current_state:
449 1
            if current_state % 2:
450 1
                current_status = 'link_down'
451
            else:
452 1
                current_status = 'link_up'
453
        else:
454 1
            current_status = None
455
456 1
        if status != current_status:
457 1
            event = KytosEvent(name=event_name+status, content=event_content)
458 1
            self.controller.buffers.app.put(event)
459
460 1
    def update_port_status(self, port_status, source):
461
        """Dispatch 'port.*' events.
462
463
        Current events:
464
465
        created|deleted|link_up|link_down|modified
466
467
        Args:
468
            port_status: python openflow (pyof) PortStatus object.
469
            source: kytos.core.switch.Connection instance.
470
471
        Dispatch:
472
            `kytos/of_core.switch.port.[created|modified|deleted]`:
473
                {
474
                  switch : <switch.id>,
475
                  port: <port.port_no>
476
                  port_description: {<description of the port>}
477
                }
478
479
        """
480 1
        reason = port_status.reason.enum_ref(port_status.reason.value).name
481 1
        port = port_status.desc
482 1
        port_no = port.port_no.value
483 1
        event_name = 'kytos/of_core.switch.interface.'
484
485 1
        if reason == 'OFPPR_ADD':
486 1
            status = 'created'
487 1
            interface = Interface(name=port.name.value,
488
                                  address=port.hw_addr.value,
489
                                  port_number=port_no,
490
                                  switch=source.switch,
491
                                  state=port.state.value,
492
                                  features=port.curr)
493 1
            source.switch.update_interface(interface)
494
495 1
        elif reason == 'OFPPR_MODIFY':
496 1
            status = 'modified'
497 1
            interface = source.switch.get_interface_by_port_no(port_no)
498 1
            current_status = None
499 1
            if interface:
500 1
                log.info('Modified %s %s:%s' %
501
                         (interface, interface.switch.dpid,
502
                          interface.port_number))
503 1
                current_status = interface.state
504 1
                interface.state = port.state.value
505 1
                interface.name = port.name.value
506 1
                interface.address = port.hw_addr.value
507 1
                interface.features = port.curr
508
            else:
509 1
                interface = Interface(name=port.name.value,
510
                                      address=port.hw_addr.value,
511
                                      port_number=port_no,
512
                                      switch=source.switch,
513
                                      state=port.state.value,
514
                                      features=port.curr)
515 1
            source.switch.update_interface(interface)
516 1
            self._send_specific_port_mod(port, interface, current_status)
517
518 1
        elif reason == 'OFPPR_DELETE':
519 1
            status = 'deleted'
520 1
            interface = source.switch.get_interface_by_port_no(port_no)
521 1
            source.switch.remove_interface(interface)
522
523 1
        event_name += status
0 ignored issues
show
introduced by
The variable status does not seem to be defined for all execution paths.
Loading history...
524 1
        content = {'interface': interface}
0 ignored issues
show
introduced by
The variable interface does not seem to be defined for all execution paths.
Loading history...
525
526 1
        event = KytosEvent(name=event_name, content=content)
527 1
        self.controller.buffers.app.put(event)
528
529 1
        msg = 'The port %s from switch %s was %s.'
530 1
        log.debug(msg, port_status.desc.port_no, source.switch.id, status)
531
532
533 1
def _get_version_from_bitmask(message_versions):
534
    """Get common version from hello message version bitmap."""
535
    try:
536
        return max([version for version in message_versions
537
                    if version in settings.OPENFLOW_VERSIONS])
538
    except ValueError:
539
        return None
540
541
542 1
def _get_version_from_header(message_version):
543
    """Get common version from hello message header version."""
544
    version = min(message_version, max(settings.OPENFLOW_VERSIONS))
545
    return version if version in settings.OPENFLOW_VERSIONS else None
546