build.main.Main.handle_features_reply()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 26
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 18
nop 2
dl 0
loc 26
ccs 13
cts 13
cp 1
crap 4
rs 9.5
c 0
b 0
f 0
1
"""NApp responsible for the main OpenFlow basic operations."""
2
3 1
from pyof.foundation.exceptions import UnpackException
4 1
from pyof.foundation.network_types import Ethernet, EtherType
5 1
from pyof.utils import PYOF_VERSION_LIBS, unpack
6 1
from pyof.v0x01.common.header import Type
7 1
from pyof.v0x01.controller2switch.common import StatsType
8 1
from pyof.v0x04.controller2switch.common import MultipartType
9
10 1
from kytos.core import KytosEvent, KytosNApp, log
11 1
from kytos.core.connection import ConnectionState
12 1
from kytos.core.helpers import listen_to
13 1
from kytos.core.interface import Interface
14 1
from napps.kytos.of_core import settings
15 1
from napps.kytos.of_core.utils import (GenericHello, NegotiationException,
16
                                       emit_message_in, emit_message_out,
17
                                       of_slicer)
18 1
from napps.kytos.of_core.v0x01 import utils as of_core_v0x01_utils
19 1
from napps.kytos.of_core.v0x01.flow import Flow as Flow01
20 1
from napps.kytos.of_core.v0x04 import utils as of_core_v0x04_utils
21 1
from napps.kytos.of_core.v0x04.flow import Flow as Flow04
22
23
24 1
class Main(KytosNApp):
25
    """Main class of the NApp responsible for OpenFlow basic operations."""
26
27
    # Keep track of multiple multipart replies from our own request only.
28
    # Assume that all replies are received before setting a new xid.
29 1
    _multipart_replies_xids = {}
30 1
    _multipart_replies_flows = {}
31 1
    _multipart_replies_ports = {}
32
33 1
    def setup(self):
34
        """App initialization (used instead of ``__init__``).
35
36
        The setup method is automatically called by the run method.
37
        Users shouldn't call this method directly.
38
        """
39 1
        self.of_core_version_utils = {0x01: of_core_v0x01_utils,
40
                                      0x04: of_core_v0x04_utils}
41 1
        self.execute_as_loop(settings.STATS_INTERVAL)
42
43 1
    def execute(self):
44
        """Run once on app 'start' or in a loop.
45
46
        The execute method is called by the run method of KytosNApp class.
47
        Users shouldn't call this method directly.
48
        """
49 1
        for switch in self.controller.switches.values():
50 1
            if switch.is_connected():
51 1
                self._request_flow_list(switch)
52 1
                if settings.SEND_ECHO_REQUESTS:
53 1
                    version_utils = \
54
                        self.of_core_version_utils[switch.
55
                                                   connection.protocol.version]
56 1
                    version_utils.send_echo(self.controller, switch)
57
58 1
    def _request_flow_list(self, switch):
59
        """Send flow stats request to a connected switch."""
60 1
        of_version = switch.connection.protocol.version
61 1
        if of_version == 0x01:
62 1
            of_core_v0x01_utils.update_flow_list(self.controller, switch)
63 1
            of_core_v0x01_utils.request_port_stats(self.controller, switch)
64 1
        elif of_version == 0x04:
65 1
            xid_flows = of_core_v0x04_utils.update_flow_list(self.controller,
66
                                                             switch)
67 1
            xid_ports = of_core_v0x04_utils.request_port_stats(self.controller,
68
                                                               switch)
69 1
            self._multipart_replies_xids[switch.id] = {'flows': xid_flows,
70
                                                       'ports': xid_ports}
71
72 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
73
    def handle_stats_reply(self, event):
74
        """Handle stats replies for v0x01 switches.
75
76
        Args:
77
            event (:class:`~kytos.core.events.KytosEvent):
78
                Event with ofpt_stats_reply in message.
79
        """
80 1
        switch = event.source.switch
81 1
        msg = event.content['message']
82 1
        if msg.body_type == StatsType.OFPST_FLOW:
83 1
            switch.flows = [Flow01.from_of_flow_stats(f, switch)
84
                            for f in msg.body]
85 1
            event_raw = KytosEvent(
86
                name='kytos/of_core.flow_stats.received',
87
                content={'switch': switch})
88 1
            self.controller.buffers.app.put(event_raw)
89 1
        elif msg.body_type == StatsType.OFPST_PORT:
90
            port_stats = [of_port_stats for of_port_stats in msg.body]
91
            port_stats_event = KytosEvent(
92
                name=f"kytos/of_core.port_stats",
93
                content={
94
                    'switch': switch,
95
                    'port_stats': port_stats
96
                    })
97
            self.controller.buffers.app.put(port_stats_event)
98 1
        elif msg.body_type == StatsType.OFPST_DESC:
99 1
            switch.update_description(msg.body)
100
101 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_features_reply')
102
    def handle_features_reply(self, event):
103
        """Handle kytos/of_core.messages.in.ofpt_features_reply event.
104
105
        This is the end of the Handshake workflow of the OpenFlow Protocol.
106
107
        Args:
108
            event (KytosEvent): Event with features reply message.
109
        """
110 1
        connection = event.source
111 1
        version_utils = self.of_core_version_utils[connection.protocol.version]
112 1
        switch = version_utils.handle_features_reply(self.controller, event)
113
114 1
        if (connection.is_during_setup() and
115
                connection.protocol.state == 'waiting_features_reply'):
116 1
            connection.protocol.state = 'handshake_complete'
117 1
            connection.set_established_state()
118 1
            version_utils.send_desc_request(self.controller, switch)
119 1
            if settings.SEND_SET_CONFIG:
120 1
                version_utils.send_set_config(self.controller, switch)
121 1
            log.info('Connection %s, Switch %s: OPENFLOW HANDSHAKE COMPLETE',
122
                     connection.id, switch.dpid)
123 1
            event_raw = KytosEvent(
124
                name='kytos/of_core.handshake.completed',
125
                content={'switch': switch})
126 1
            self.controller.buffers.app.put(event_raw)
127
128 1
    @listen_to('kytos/of_core.handshake.completed')
129
    def on_handshake_completed_request_flow_list(self, event):
130
        """Request an flow list right after the handshake is completed.
131
132
        Args:
133
            event (KytosEvent): Event with the switch' handshake completed
134
        """
135 1
        switch = event.content['switch']
136 1
        if switch.is_enabled():
137 1
            self._request_flow_list(switch)
138
139 1
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
140
    def handle_multipart_reply(self, event):
141
        """Handle multipart replies for v0x04 switches.
142
143
        Args:
144
            event (:class:`~kytos.core.events.KytosEvent):
145
                Event with ofpt_multipart_reply in message.
146
        """
147 1
        reply = event.content['message']
148 1
        switch = event.source.switch
149
150 1
        if reply.multipart_type == MultipartType.OFPMP_FLOW:
151 1
            self._handle_multipart_flow_stats(reply, switch)
152 1
        elif reply.multipart_type == MultipartType.OFPMP_PORT_STATS:
153
            self._handle_multipart_port_stats(reply, switch)
154 1
        elif reply.multipart_type == MultipartType.OFPMP_PORT_DESC:
155 1
            of_core_v0x04_utils.handle_port_desc(self.controller, switch,
156
                                                 reply.body)
157 1
        elif reply.multipart_type == MultipartType.OFPMP_DESC:
158 1
            switch.update_description(reply.body)
159
160 1
    def _handle_multipart_flow_stats(self, reply, switch):
161
        """Update switch flows after all replies are received."""
162 1
        if self._is_multipart_reply_ours(reply, switch, 'flows'):
163
            # Get all flows from the reply
164 1
            flows = [Flow04.from_of_flow_stats(of_flow_stats, switch)
165
                     for of_flow_stats in reply.body]
166
            # Get existent flows from the same xid (or create an empty list)
167 1
            all_flows = self._multipart_replies_flows.setdefault(switch.id, [])
168 1
            all_flows.extend(flows)
169 1
            if reply.flags.value % 2 == 0:  # Last bit means more replies
170 1
                self._update_switch_flows(switch)
171 1
                event_raw = KytosEvent(
172
                    name='kytos/of_core.flow_stats.received',
173
                    content={'switch': switch})
174 1
                self.controller.buffers.app.put(event_raw)
175
176 1
    def _handle_multipart_port_stats(self, reply, switch):
177
        """Emit an event about new port stats."""
178 1
        if self._is_multipart_reply_ours(reply, switch, 'ports'):
179 1
            port_stats = [of_port_stats for of_port_stats in reply.body]
180 1
            all_port_stats = self._multipart_replies_ports.setdefault(
181
                switch.id, []
182
            )
183 1
            all_port_stats.extend(port_stats)
184 1
            if reply.flags.value % 2 == 0:
185 1
                self._new_port_stats(switch)
186
187 1
    def _update_switch_flows(self, switch):
188
        """Update controllers' switch flow list and clean resources."""
189 1
        switch.flows = self._multipart_replies_flows[switch.id]
190 1
        del self._multipart_replies_flows[switch.id]
191 1
        del self._multipart_replies_xids[switch.id]['flows']
192
193 1
    def _new_port_stats(self, switch):
194
        """Send an event with the new port stats and clean resources."""
195
        all_port_stats = self._multipart_replies_ports[switch.id]
196
        del self._multipart_replies_ports[switch.id]
197
        del self._multipart_replies_xids[switch.id]['ports']
198
        port_stats_event = KytosEvent(
199
            name=f"kytos/of_core.port_stats",
200
            content={
201
                'switch': switch,
202
                'port_stats': all_port_stats
203
                })
204
        self.controller.buffers.app.put(port_stats_event)
205
206 1
    def _is_multipart_reply_ours(self, reply, switch, stat):
207
        """Return whether we are expecting the reply."""
208 1
        if switch.id in self._multipart_replies_xids:
209 1
            sent_xid = self._multipart_replies_xids[switch.id].get(stat)
210 1
            if sent_xid == reply.header.xid:
211 1
                return True
212 1
        return False
213
214 1
    @listen_to('kytos/core.openflow.raw.in')
215
    def handle_raw_in(self, event):
216
        """Handle a RawEvent and generate a kytos/core.messages.in.* event.
217
218
        Args:
219
            event (KytosEvent): RawEvent with openflow message to be unpacked
220
        """
221
        # If the switch is already known to the controller, update the
222
        # 'lastseen' attribute
223 1
        switch = event.source.switch
224 1
        if switch:
225 1
            switch.update_lastseen()
226
227 1
        connection = event.source
228
229 1
        data = connection.remaining_data + event.content['new_data']
230 1
        packets, connection.remaining_data = of_slicer(data)
231 1
        if not packets:
232
            return
233
234 1
        unprocessed_packets = []
235
236 1
        for packet in packets:
237 1
            if not connection.is_alive():
238
                return
239
240 1
            if connection.is_new():
241 1
                try:
242 1
                    message = GenericHello(packet=packet)
243 1
                    self._negotiate(connection, message)
244 1
                except (UnpackException, NegotiationException) as err:
245 1
                    if isinstance(err, UnpackException):
246
                        log.error('Connection %s: Invalid hello message',
247
                                  connection.id)
248
                    else:
249 1
                        log.error('Connection %s: Negotiation Failed',
250
                                  connection.id)
251 1
                    connection.protocol.state = 'hello_failed'
252 1
                    connection.close()
253 1
                    connection.state = ConnectionState.FAILED
254 1
                    return
255 1
                connection.set_setup_state()
256 1
                continue
257
258 1
            try:
259 1
                message = connection.protocol.unpack(packet)
260 1
                if message.header.message_type == Type.OFPT_ERROR:
261
                    log.error("OFPT_ERROR: %s error code received from"
262
                              " switch %s with xid %s", message.code,
263
                              switch.dpid, message.header.xid)
264 1
            except (UnpackException, AttributeError) as err:
265 1
                log.error(err)
266 1
                if isinstance(err, AttributeError):
267 1
                    error_msg = 'connection closed before version negotiation'
268 1
                    log.error('Connection %s: %s', connection.id, error_msg)
269 1
                connection.close()
270 1
                return
271
272 1
            log.debug('Connection %s: IN OFP, version: %s, type: %s, xid: %s',
273
                      connection.id,
274
                      message.header.version,
275
                      message.header.message_type,
276
                      message.header.xid)
277
278 1
            waiting_features_reply = (
279
                str(message.header.message_type) == 'Type.OFPT_FEATURES_REPLY'
280
                and connection.protocol.state == 'waiting_features_reply')
281
282 1
            if connection.is_during_setup() and not waiting_features_reply:
283
                unprocessed_packets.append(packet)
284
                continue
285
286 1
            self.emit_message_in(connection, message)
287
288 1
        connection.remaining_data = b''.join(unprocessed_packets) + \
289
                                    connection.remaining_data
290
291 1
    def emit_message_in(self, connection, message):
292
        """Emit a KytosEvent for each incoming message.
293
294
        Also update links and port status.
295
        """
296 1
        if not connection.is_alive():
297
            return
298 1
        emit_message_in(self.controller, connection, message)
299 1
        msg_type = message.header.message_type.name.lower()
300 1
        if msg_type == 'ofpt_port_status':
301 1
            self.update_port_status(message, connection)
302 1
        elif msg_type == 'ofpt_packet_in':
303 1
            self.update_links(message, connection)
304
305 1
    def emit_message_out(self, connection, message):
306
        """Emit a KytosEvent for each outgoing message."""
307 1
        if connection.is_alive():
308 1
            emit_message_out(self.controller, connection, message)
309
310 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_echo_request')
311
    def handle_echo_request(self, event):
312
        """Handle Echo Request Messages.
313
314
        This method will get a echo request sent by client and generate a
315
        echo reply as answer.
316
317
        Args:
318
            event (:class:`~kytos.core.events.KytosEvent`):
319
                Event with echo request in message.
320
321
        """
322 1
        pyof_lib = PYOF_VERSION_LIBS[event.source.protocol.version]
323 1
        echo_request = event.message
324 1
        echo_reply = pyof_lib.symmetric.echo_reply.EchoReply(
325
            xid=echo_request.header.xid,
326
            data=echo_request.data)
327 1
        self.emit_message_out(event.source, echo_reply)
328
329 1
    def _negotiate(self, connection, message):
330
        """Handle hello messages.
331
332
        This method will handle the incoming hello message by client
333
        and deal with negotiation.
334
335
        Parameters:
336
            event (KytosMessageInHello): KytosMessageInHelloEvent
337
338
        """
339 1
        if message.versions:
340 1
            version = _get_version_from_bitmask(message.versions)
341
        else:
342 1
            version = _get_version_from_header(message.header.version)
343
344 1
        log.debug('connection %s: negotiated version - %s',
345
                  connection.id, str(version))
346
347 1
        if version is None:
348 1
            self.fail_negotiation(connection, message)
349 1
            raise NegotiationException()
350
351 1
        version_utils = self.of_core_version_utils[version]
352 1
        version_utils.say_hello(self.controller, connection)
353
354 1
        connection.protocol.name = 'openflow'
355 1
        connection.protocol.version = version
356 1
        connection.protocol.unpack = unpack
357 1
        connection.protocol.state = 'sending_features'
358 1
        self.send_features_request(connection)
359 1
        log.debug('Connection %s: Hello complete', connection.id)
360
361 1
    def fail_negotiation(self, connection, hello_message):
362
        """Send Error message and emit event upon negotiation failure."""
363 1
        log.warning('connection %s: version negotiation failed',
364
                    connection.id)
365 1
        connection.protocol.state = 'hello_failed'
366 1
        event_raw = KytosEvent(
367
            name='kytos/of_core.hello_failed',
368
            content={'source': connection})
369 1
        self.controller.buffers.app.put(event_raw)
370
371 1
        version = max(settings.OPENFLOW_VERSIONS)
372 1
        pyof_lib = PYOF_VERSION_LIBS[version]
373
374 1
        error_message = pyof_lib.asynchronous.error_msg.ErrorMsg(
375
            xid=hello_message.header.xid,
376
            error_type=pyof_lib.asynchronous.error_msg.
377
            ErrorType.OFPET_HELLO_FAILED,
378
            code=pyof_lib.asynchronous.error_msg.HelloFailedCode.
379
            OFPHFC_INCOMPATIBLE)
380 1
        self.emit_message_out(connection, error_message)
381
382
    # May be removed
383 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_echo_reply')
384
    def handle_queued_openflow_echo_reply(self, event):
385
        """Handle queued OpenFlow echo reply messages.
386
387
        Send a feature request message if SEND_FEATURES_REQUEST_ON_ECHO
388
        is True (default is False).
389
        """
390 1
        if settings.SEND_FEATURES_REQUEST_ON_ECHO:
391 1
            self.send_features_request(event.destination)
392
393 1
    def send_features_request(self, destination):
394
        """Send a feature request to the switch."""
395 1
        version = destination.protocol.version
396 1
        pyof_lib = PYOF_VERSION_LIBS[version]
397 1
        features_request = pyof_lib.controller2switch.\
398
            features_request.FeaturesRequest()
399 1
        self.emit_message_out(destination, features_request)
400
401
    # pylint: disable=no-self-use
402 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_features_request')
403
    def handle_features_request_sent(self, event):
404
        """Ensure request has actually been sent before changing state."""
405 1
        if event.destination.protocol.state == 'sending_features':
406 1
            event.destination.protocol.state = 'waiting_features_reply'
407
    # pylint: enable=no-self-use
408
409 1
    @staticmethod
410 1
    @listen_to('kytos/of_core.v0x[0-9a-f]{2}.messages.in.hello_failed',
411
               'kytos/of_core.v0x0[14].messages.out.hello_failed')
412
    def handle_openflow_in_hello_failed(event):
413
        """Close the connection upon hello failure."""
414 1
        event.destination.close()
415 1
        log.debug("Connection %s: Connection closed.", event.destination.id)
416
417 1
    def shutdown(self):
418
        """End of the application."""
419 1
        log.debug('Shutting down...')
420
421 1
    def update_links(self, message, source):
422
        """Dispatch 'reacheable.mac' event.
423
424
        Args:
425
            message: python openflow (pyof) PacketIn object.
426
            source: kytos.core.switch.Connection instance.
427
428
        Dispatch:
429
            `reachable.mac`:
430
                {
431
                  switch : <switch.id>,
432
                  port: <port.port_no>
433
                  reachable_mac: <mac_address>
434
                }
435
436
        """
437 1
        ethernet = Ethernet()
438 1
        ethernet.unpack(message.data.value)
439 1
        if ethernet.ether_type in (EtherType.LLDP, EtherType.IPV6):
440
            return
441
442 1
        try:
443 1
            port = source.switch.get_interface_by_port_no(
444
                message.in_port.value)
445 1
        except AttributeError:
446 1
            port = source.switch.get_interface_by_port_no(message.in_port)
447
448 1
        name = 'kytos/of_core.reachable.mac'
449 1
        content = {'switch': source.switch,
450
                   'port': port,
451
                   'reachable_mac': ethernet.source.value}
452 1
        event = KytosEvent(name, content)
453 1
        self.controller.buffers.app.put(event)
454
455 1
        msg = 'The MAC %s is reachable from switch/port %s/%s.'
456 1
        log.debug(msg, ethernet.source, source.switch.id,
457
                  message.in_port)
458
459 1
    def _send_specific_port_mod(self, port, interface, current_state):
460
        """Dispatch port link_up/link_down events."""
461 1
        event_name = 'kytos/of_core.switch.interface.'
462 1
        event_content = {'interface': interface}
463
464 1
        if port.state.value % 2:
465 1
            status = 'link_down'
466
        else:
467 1
            status = 'link_up'
468
469 1
        if current_state:
470 1
            if current_state % 2:
471 1
                current_status = 'link_down'
472
            else:
473 1
                current_status = 'link_up'
474
        else:
475 1
            current_status = None
476
477 1
        if status != current_status:
478 1
            event = KytosEvent(name=event_name+status, content=event_content)
479 1
            self.controller.buffers.app.put(event)
480
481 1
    def update_port_status(self, port_status, source):
482
        """Dispatch 'port.*' events.
483
484
        Current events:
485
486
        created|deleted|link_up|link_down|modified
487
488
        Args:
489
            port_status: python openflow (pyof) PortStatus object.
490
            source: kytos.core.switch.Connection instance.
491
492
        Dispatch:
493
            `kytos/of_core.switch.port.[created|modified|deleted]`:
494
                {
495
                  switch : <switch.id>,
496
                  port: <port.port_no>
497
                  port_description: {<description of the port>}
498
                }
499
500
        """
501 1
        reason = port_status.reason.enum_ref(port_status.reason.value).name
502 1
        port = port_status.desc
503 1
        port_no = port.port_no.value
504 1
        event_name = 'kytos/of_core.switch.interface.'
505
506 1
        if reason == 'OFPPR_ADD':
507 1
            status = 'created'
508 1
            interface = Interface(name=port.name.value,
509
                                  address=port.hw_addr.value,
510
                                  port_number=port_no,
511
                                  switch=source.switch,
512
                                  state=port.state.value,
513
                                  features=port.curr)
514 1
            source.switch.update_interface(interface)
515
516 1
        elif reason == 'OFPPR_MODIFY':
517 1
            status = 'modified'
518 1
            interface = source.switch.get_interface_by_port_no(port_no)
519 1
            current_status = None
520 1
            if interface:
521 1
                log.info('Modified %s %s:%s' %
522
                         (interface, interface.switch.dpid,
523
                          interface.port_number))
524 1
                current_status = interface.state
525 1
                interface.state = port.state.value
526 1
                interface.name = port.name.value
527 1
                interface.address = port.hw_addr.value
528 1
                interface.features = port.curr
529
            else:
530 1
                interface = Interface(name=port.name.value,
531
                                      address=port.hw_addr.value,
532
                                      port_number=port_no,
533
                                      switch=source.switch,
534
                                      state=port.state.value,
535
                                      features=port.curr)
536 1
            source.switch.update_interface(interface)
537 1
            self._send_specific_port_mod(port, interface, current_status)
538
539 1
        elif reason == 'OFPPR_DELETE':
540 1
            status = 'deleted'
541 1
            interface = source.switch.get_interface_by_port_no(port_no)
542 1
            source.switch.remove_interface(interface)
543
544 1
        event_name += status
0 ignored issues
show
introduced by
The variable status does not seem to be defined for all execution paths.
Loading history...
545 1
        content = {'interface': interface}
0 ignored issues
show
introduced by
The variable interface does not seem to be defined for all execution paths.
Loading history...
546
547 1
        event = KytosEvent(name=event_name, content=content)
548 1
        self.controller.buffers.app.put(event)
549
550 1
        msg = 'The port %s from switch %s was %s.'
551 1
        log.debug(msg, port_status.desc.port_no, source.switch.id, status)
552
553
554 1
def _get_version_from_bitmask(message_versions):
555
    """Get common version from hello message version bitmap."""
556
    try:
557
        return max([version for version in message_versions
558
                    if version in settings.OPENFLOW_VERSIONS])
559
    except ValueError:
560
        return None
561
562
563 1
def _get_version_from_header(message_version):
564
    """Get common version from hello message header version."""
565
    version = min(message_version, max(settings.OPENFLOW_VERSIONS))
566
    return version if version in settings.OPENFLOW_VERSIONS else None
567