Passed
Push — master ( 75d007...4d9626 )
by Humberto
01:10 queued 12s
created

build.main.Main._negotiate()   A

Complexity

Conditions 3

Size

Total Lines 31
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 17
nop 3
dl 0
loc 31
rs 9.55
c 0
b 0
f 0
ccs 16
cts 16
cp 1
crap 3
1
"""NApp responsible for the main OpenFlow basic operations."""
2
3 1
from pyof.foundation.exceptions import UnpackException
4 1
from pyof.foundation.network_types import Ethernet, EtherType
5 1
from pyof.utils import PYOF_VERSION_LIBS, unpack
6 1
from pyof.v0x01.common.header import Type
7 1
from pyof.v0x01.controller2switch.common import StatsType
8 1
from pyof.v0x04.controller2switch.common import MultipartType
9
10 1
from kytos.core import KytosEvent, KytosNApp, log
11 1
from kytos.core.connection import ConnectionState
12 1
from kytos.core.helpers import listen_to
13 1
from kytos.core.interface import Interface
14 1
from napps.kytos.of_core import settings
15 1
from napps.kytos.of_core.utils import (GenericHello, NegotiationException,
16
                                       emit_message_in, emit_message_out,
17
                                       of_slicer)
18 1
from napps.kytos.of_core.v0x01 import utils as of_core_v0x01_utils
19 1
from napps.kytos.of_core.v0x01.flow import Flow as Flow01
20 1
from napps.kytos.of_core.v0x04 import utils as of_core_v0x04_utils
21 1
from napps.kytos.of_core.v0x04.flow import Flow as Flow04
22
23
24 1
class Main(KytosNApp):
25
    """Main class of the NApp responsible for OpenFlow basic operations."""
26
27
    # Keep track of multiple multipart replies from our own request only.
28
    # Assume that all replies are received before setting a new xid.
29 1
    _multipart_replies_xids = {}
30 1
    _multipart_replies_flows = {}
31
32 1
    def setup(self):
33
        """App initialization (used instead of ``__init__``).
34
35
        The setup method is automatically called by the run method.
36
        Users shouldn't call this method directly.
37
        """
38 1
        self.of_core_version_utils = {0x01: of_core_v0x01_utils,
39
                                      0x04: of_core_v0x04_utils}
40 1
        self.execute_as_loop(settings.STATS_INTERVAL)
41
42 1
    def execute(self):
43
        """Run once on app 'start' or in a loop.
44
45
        The execute method is called by the run method of KytosNApp class.
46
        Users shouldn't call this method directly.
47
        """
48 1
        for switch in self.controller.switches.values():
49 1
            if switch.is_connected():
50 1
                self._request_flow_list(switch)
51 1
                if settings.SEND_ECHO_REQUESTS:
52 1
                    version_utils = \
53
                        self.of_core_version_utils[switch.
54
                                                   connection.protocol.version]
55 1
                    version_utils.send_echo(self.controller, switch)
56
57 1
    def _request_flow_list(self, switch):
58
        """Send flow stats request to a connected switch."""
59 1
        of_version = switch.connection.protocol.version
60 1
        if of_version == 0x01:
61 1
            of_core_v0x01_utils.update_flow_list(self.controller, switch)
62 1
        elif of_version == 0x04:
63 1
            xid = of_core_v0x04_utils.update_flow_list(self.controller, switch)
64 1
            self._multipart_replies_xids[switch.id] = xid
65
66 1
    @staticmethod
67 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
68
    def handle_stats_reply(event):
69
        """Handle stats replies for v0x01 switches.
70
71
        Args:
72
            event (:class:`~kytos.core.events.KytosEvent):
73
                Event with ofpt_stats_reply in message.
74
        """
75 1
        switch = event.source.switch
76 1
        msg = event.content['message']
77 1
        if msg.body_type == StatsType.OFPST_FLOW:
78 1
            switch.flows = [Flow01.from_of_flow_stats(f, switch)
79
                            for f in msg.body]
80 1
        elif msg.body_type == StatsType.OFPST_DESC:
81 1
            switch.update_description(msg.body)
82
83 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_features_reply')
84
    def handle_features_reply(self, event):
85
        """Handle kytos/of_core.messages.in.ofpt_features_reply event.
86
87
        This is the end of the Handshake workflow of the OpenFlow Protocol.
88
89
        Args:
90
            event (KytosEvent): Event with features reply message.
91
        """
92 1
        connection = event.source
93 1
        version_utils = self.of_core_version_utils[connection.protocol.version]
94 1
        switch = version_utils.handle_features_reply(self.controller, event)
95
96 1
        if (connection.is_during_setup() and
97
                connection.protocol.state == 'waiting_features_reply'):
98 1
            connection.protocol.state = 'handshake_complete'
99 1
            connection.set_established_state()
100 1
            version_utils.send_desc_request(self.controller, switch)
101 1
            if settings.SEND_SET_CONFIG:
102 1
                version_utils.send_set_config(self.controller, switch)
103 1
            log.info('Connection %s, Switch %s: OPENFLOW HANDSHAKE COMPLETE',
104
                     connection.id, switch.dpid)
105 1
            event_raw = KytosEvent(
106
                name='kytos/of_core.handshake.completed',
107
                content={'switch': switch})
108 1
            self.controller.buffers.app.put(event_raw)
109
110 1
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
111
    def handle_multipart_reply(self, event):
112
        """Handle multipart replies for v0x04 switches.
113
114
        Args:
115
            event (:class:`~kytos.core.events.KytosEvent):
116
                Event with ofpt_multipart_reply in message.
117
        """
118 1
        reply = event.content['message']
119 1
        switch = event.source.switch
120
121 1
        if reply.multipart_type == MultipartType.OFPMP_FLOW:
122 1
            self._handle_multipart_flow_stats(reply, switch)
123 1
        elif reply.multipart_type == MultipartType.OFPMP_PORT_DESC:
124 1
            of_core_v0x04_utils.handle_port_desc(self.controller, switch,
125
                                                 reply.body)
126 1
        elif reply.multipart_type == MultipartType.OFPMP_DESC:
127 1
            switch.update_description(reply.body)
128
129 1
    def _handle_multipart_flow_stats(self, reply, switch):
130
        """Update switch flows after all replies are received."""
131 1
        if self._is_multipart_reply_ours(reply, switch):
132
            # Get all flows from the reply
133 1
            flows = [Flow04.from_of_flow_stats(of_flow_stats, switch)
134
                     for of_flow_stats in reply.body]
135
            # Get existent flows from the same xid (or create an empty list)
136 1
            all_flows = self._multipart_replies_flows.setdefault(switch.id, [])
137 1
            all_flows.extend(flows)
138 1
            if reply.flags.value % 2 == 0:  # Last bit means more replies
139 1
                self._update_switch_flows(switch)
140
141 1
    def _update_switch_flows(self, switch):
142
        """Update controllers' switch flow list and clean resources."""
143 1
        switch.flows = self._multipart_replies_flows[switch.id]
144 1
        del self._multipart_replies_flows[switch.id]
145 1
        del self._multipart_replies_xids[switch.id]
146
147 1
    def _is_multipart_reply_ours(self, reply, switch):
148
        """Return whether we are expecting the reply."""
149 1
        if switch.id in self._multipart_replies_xids:
150 1
            sent_xid = self._multipart_replies_xids[switch.id]
151 1
            if sent_xid == reply.header.xid:
152 1
                return True
153
        return False
154
155 1
    @listen_to('kytos/core.openflow.raw.in')
156
    def handle_raw_in(self, event):
157
        """Handle a RawEvent and generate a kytos/core.messages.in.* event.
158
159
        Args:
160
            event (KytosEvent): RawEvent with openflow message to be unpacked
161
        """
162
        # If the switch is already known to the controller, update the
163
        # 'lastseen' attribute
164 1
        switch = event.source.switch
165 1
        if switch:
166 1
            switch.update_lastseen()
167
168 1
        connection = event.source
169
170 1
        data = connection.remaining_data + event.content['new_data']
171 1
        packets, connection.remaining_data = of_slicer(data)
172 1
        if not packets:
173
            return
174
175 1
        unprocessed_packets = []
176
177 1
        for packet in packets:
178 1
            if not connection.is_alive():
179
                return
180 1
            log.debug('Connection %s: New Raw Openflow packet - %s',
181
                      connection.id, packet.hex())
182
183 1
            if connection.is_new():
184 1
                try:
185 1
                    message = GenericHello(packet=packet)
186 1
                    self._negotiate(connection, message)
187 1
                except (UnpackException, NegotiationException) as err:
188 1
                    if isinstance(err, UnpackException):
189
                        log.error('Connection %s: Invalid hello message',
190
                                  connection.id)
191
                    else:
192 1
                        log.error('Connection %s: Negotiation Failed',
193
                                  connection.id)
194 1
                    connection.protocol.state = 'hello_failed'
195 1
                    connection.close()
196 1
                    connection.state = ConnectionState.FAILED
197 1
                    return
198 1
                connection.set_setup_state()
199 1
                continue
200
201 1
            try:
202 1
                message = connection.protocol.unpack(packet)
203 1
                if message.header.message_type == Type.OFPT_ERROR:
204
                    log.error(f"OFPT_ERROR: {str(message.code)}")
205
            except (UnpackException, AttributeError) as err:
206
                log.error(err)
207
                if isinstance(err, AttributeError):
208
                    error_msg = 'connection closed before version negotiation'
209
                    log.error('Connection %s: %s', connection.id, error_msg)
210
                connection.close()
211
                return
212
213 1
            log.debug('Connection %s: IN OFP, version: %s, type: %s, xid: %s',
214
                      connection.id,
215
                      message.header.version,
216
                      message.header.message_type,
217
                      message.header.xid)
218
219 1
            waiting_features_reply = (
220
                str(message.header.message_type) == 'Type.OFPT_FEATURES_REPLY'
221
                and connection.protocol.state == 'waiting_features_reply')
222
223 1
            if connection.is_during_setup() and not waiting_features_reply:
224
                unprocessed_packets.append(packet)
225
                continue
226
227 1
            self.emit_message_in(connection, message)
228
229 1
        connection.remaining_data = b''.join(unprocessed_packets) + \
230
                                    connection.remaining_data
231
232 1
    def emit_message_in(self, connection, message):
233
        """Emit a KytosEvent for each incoming message.
234
235
        Also update links and port status.
236
        """
237 1
        if not connection.is_alive():
238
            return
239 1
        emit_message_in(self.controller, connection, message)
240 1
        msg_type = message.header.message_type.name.lower()
241 1
        if msg_type == 'ofpt_port_status':
242 1
            self.update_port_status(message, connection)
243 1
        elif msg_type == 'ofpt_packet_in':
244 1
            self.update_links(message, connection)
245
246 1
    def emit_message_out(self, connection, message):
247
        """Emit a KytosEvent for each outgoing message."""
248 1
        if connection.is_alive():
249 1
            emit_message_out(self.controller, connection, message)
250
251 1
    @listen_to('kytos/of_core.v0x0[14].messages.in.ofpt_echo_request')
252
    def handle_echo_request(self, event):
253
        """Handle Echo Request Messages.
254
255
        This method will get a echo request sent by client and generate a
256
        echo reply as answer.
257
258
        Args:
259
            event (:class:`~kytos.core.events.KytosEvent`):
260
                Event with echo request in message.
261
262
        """
263 1
        pyof_lib = PYOF_VERSION_LIBS[event.source.protocol.version]
264 1
        echo_request = event.message
265 1
        echo_reply = pyof_lib.symmetric.echo_reply.EchoReply(
266
            xid=echo_request.header.xid,
267
            data=echo_request.data)
268 1
        self.emit_message_out(event.source, echo_reply)
269
270 1
    def _negotiate(self, connection, message):
271
        """Handle hello messages.
272
273
        This method will handle the incoming hello message by client
274
        and deal with negotiation.
275
276
        Parameters:
277
            event (KytosMessageInHello): KytosMessageInHelloEvent
278
279
        """
280 1
        if message.versions:
281 1
            version = _get_version_from_bitmask(message.versions)
282
        else:
283 1
            version = _get_version_from_header(message.header.version)
284
285 1
        log.debug('connection %s: negotiated version - %s',
286
                  connection.id, str(version))
287
288 1
        if version is None:
289 1
            self.fail_negotiation(connection, message)
290 1
            raise NegotiationException()
291
292 1
        version_utils = self.of_core_version_utils[version]
293 1
        version_utils.say_hello(self.controller, connection)
294
295 1
        connection.protocol.name = 'openflow'
296 1
        connection.protocol.version = version
297 1
        connection.protocol.unpack = unpack
298 1
        connection.protocol.state = 'sending_features'
299 1
        self.send_features_request(connection)
300 1
        log.debug('Connection %s: Hello complete', connection.id)
301
302 1
    def fail_negotiation(self, connection, hello_message):
303
        """Send Error message and emit event upon negotiation failure."""
304 1
        log.warning('connection %s: version negotiation failed',
305
                    connection.id)
306 1
        connection.protocol.state = 'hello_failed'
307 1
        event_raw = KytosEvent(
308
            name='kytos/of_core.hello_failed',
309
            content={'source': connection})
310 1
        self.controller.buffers.app.put(event_raw)
311
312 1
        version = max(settings.OPENFLOW_VERSIONS)
313 1
        pyof_lib = PYOF_VERSION_LIBS[version]
314
315 1
        error_message = pyof_lib.asynchronous.error_msg.ErrorMsg(
316
            xid=hello_message.header.xid,
317
            error_type=pyof_lib.asynchronous.error_msg.
318
            ErrorType.OFPET_HELLO_FAILED,
319
            code=pyof_lib.asynchronous.error_msg.HelloFailedCode.
320
            OFPHFC_INCOMPATIBLE)
321 1
        self.emit_message_out(connection, error_message)
322
323
    # May be removed
324 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_echo_reply')
325
    def handle_queued_openflow_echo_reply(self, event):
326
        """Handle queued OpenFlow echo reply messages.
327
328
        Send a feature request message if SEND_FEATURES_REQUEST_ON_ECHO
329
        is True (default is False).
330
        """
331 1
        if settings.SEND_FEATURES_REQUEST_ON_ECHO:
332 1
            self.send_features_request(event.destination)
333
334 1
    def send_features_request(self, destination):
335
        """Send a feature request to the switch."""
336 1
        version = destination.protocol.version
337 1
        pyof_lib = PYOF_VERSION_LIBS[version]
338 1
        features_request = pyof_lib.controller2switch.\
339
            features_request.FeaturesRequest()
340 1
        self.emit_message_out(destination, features_request)
341
342
    # pylint: disable=no-self-use
343 1
    @listen_to('kytos/of_core.v0x0[14].messages.out.ofpt_features_request')
344
    def handle_features_request_sent(self, event):
345
        """Ensure request has actually been sent before changing state."""
346 1
        if event.destination.protocol.state == 'sending_features':
347 1
            event.destination.protocol.state = 'waiting_features_reply'
348
    # pylint: enable=no-self-use
349
350 1
    @staticmethod
351 1
    @listen_to('kytos/of_core.v0x[0-9a-f]{2}.messages.in.hello_failed',
352
               'kytos/of_core.v0x0[14].messages.out.hello_failed')
353
    def handle_openflow_in_hello_failed(event):
354
        """Close the connection upon hello failure."""
355
        event.destination.close()
356
        log.debug("Connection %s: Connection closed.", event.destination.id)
357
358 1
    def shutdown(self):
359
        """End of the application."""
360
        log.debug('Shutting down...')
361
362 1
    def update_links(self, message, source):
363
        """Dispatch 'reacheable.mac' event.
364
365
        Args:
366
            message: python openflow (pyof) PacketIn object.
367
            source: kytos.core.switch.Connection instance.
368
369
        Dispatch:
370
            `reachable.mac`:
371
                {
372
                  switch : <switch.id>,
373
                  port: <port.port_no>
374
                  reachable_mac: <mac_address>
375
                }
376
377
        """
378 1
        ethernet = Ethernet()
379 1
        ethernet.unpack(message.data.value)
380 1
        if ethernet.ether_type in (EtherType.LLDP, EtherType.IPV6):
381
            return
382
383 1
        try:
384 1
            port = source.switch.get_interface_by_port_no(
385
                message.in_port.value)
386 1
        except AttributeError:
387 1
            port = source.switch.get_interface_by_port_no(message.in_port)
388
389 1
        name = 'kytos/of_core.reachable.mac'
390 1
        content = {'switch': source.switch,
391
                   'port': port,
392
                   'reachable_mac': ethernet.source.value}
393 1
        event = KytosEvent(name, content)
394 1
        self.controller.buffers.app.put(event)
395
396 1
        msg = 'The MAC %s is reachable from switch/port %s/%s.'
397 1
        log.debug(msg, ethernet.source, source.switch.id,
398
                  message.in_port)
399
400 1
    def _send_specific_port_mod(self, port, interface, current_state):
401
        """Dispatch port link_up/link_down events."""
402 1
        event_name = 'kytos/of_core.switch.interface.'
403 1
        event_content = {'interface': interface}
404
405 1
        if port.state.value % 2:
406 1
            status = 'link_down'
407
        else:
408 1
            status = 'link_up'
409
410 1
        if current_state:
411 1
            if current_state % 2:
412
                current_status = 'link_down'
413
            else:
414 1
                current_status = 'link_up'
415
        else:
416 1
            current_status = None
417
418 1
        if status != current_status:
419 1
            event = KytosEvent(name=event_name+status, content=event_content)
420 1
            self.controller.buffers.app.put(event)
421
422 1
    def update_port_status(self, port_status, source):
423
        """Dispatch 'port.*' events.
424
425
        Current events:
426
427
        created|deleted|link_up|link_down|modified
428
429
        Args:
430
            port_status: python openflow (pyof) PortStatus object.
431
            source: kytos.core.switch.Connection instance.
432
433
        Dispatch:
434
            `kytos/of_core.switch.port.[created|modified|deleted]`:
435
                {
436
                  switch : <switch.id>,
437
                  port: <port.port_no>
438
                  port_description: {<description of the port>}
439
                }
440
441
        """
442 1
        reason = port_status.reason.enum_ref(port_status.reason.value).name
443 1
        port = port_status.desc
444 1
        port_no = port.port_no.value
445 1
        event_name = 'kytos/of_core.switch.interface.'
446
447 1
        if reason == 'OFPPR_ADD':
448 1
            status = 'created'
449 1
            interface = Interface(name=port.name.value,
450
                                  address=port.hw_addr.value,
451
                                  port_number=port_no,
452
                                  switch=source.switch,
453
                                  state=port.state.value,
454
                                  features=port.curr)
455 1
            source.switch.update_interface(interface)
456
457 1
        elif reason == 'OFPPR_MODIFY':
458 1
            status = 'modified'
459 1
            interface = source.switch.get_interface_by_port_no(port_no)
460 1
            current_status = None
461 1
            if interface:
462 1
                log.info('Modified %s %s:%s' %
463
                         (interface, interface.switch.dpid,
464
                          interface.port_number))
465 1
                current_status = interface.state
466 1
                interface.state = port.state.value
467 1
                interface.name = port.name.value
468 1
                interface.address = port.hw_addr.value
469 1
                interface.features = port.curr
470
            else:
471 1
                interface = Interface(name=port.name.value,
472
                                      address=port.hw_addr.value,
473
                                      port_number=port_no,
474
                                      switch=source.switch,
475
                                      state=port.state.value,
476
                                      features=port.curr)
477 1
            source.switch.update_interface(interface)
478 1
            self._send_specific_port_mod(port, interface, current_status)
479
480
        elif reason == 'OFPPR_DELETE':
481
            status = 'deleted'
482
            interface = source.switch.get_interface_by_port_no(port_no)
483
            source.switch.remove_interface(interface)
484
485 1
        event_name += status
0 ignored issues
show
introduced by
The variable status does not seem to be defined for all execution paths.
Loading history...
486 1
        content = {'interface': interface}
0 ignored issues
show
introduced by
The variable interface does not seem to be defined for all execution paths.
Loading history...
487
488 1
        event = KytosEvent(name=event_name, content=content)
489 1
        self.controller.buffers.app.put(event)
490
491 1
        msg = 'The port %s from switch %s was %s.'
492 1
        log.debug(msg, port_status.desc.port_no, source.switch.id, status)
493
494
495 1
def _get_version_from_bitmask(message_versions):
496
    """Get common version from hello message version bitmap."""
497 1
    try:
498 1
        return max([version for version in message_versions
499
                    if version in settings.OPENFLOW_VERSIONS])
500
    except ValueError:
501
        return None
502
503
504 1
def _get_version_from_header(message_version):
505
    """Get common version from hello message header version."""
506 1
    version = min(message_version, max(settings.OPENFLOW_VERSIONS))
507
    return version if version in settings.OPENFLOW_VERSIONS else None
508