build.main.Main.execute()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 7
ccs 1
cts 2
cp 0.5
rs 10
c 0
b 0
f 0
cc 1
nop 1
crap 1.125
1
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
2 1
from collections import OrderedDict
3 1
from copy import deepcopy
4
5 1
from flask import jsonify, request
6 1
from pyof.foundation.base import UBIntBase
7 1
from pyof.v0x01.asynchronous.error_msg import BadActionCode
8 1
from pyof.v0x01.common.phy_port import PortConfig
9 1
from werkzeug.exceptions import BadRequest, NotFound, UnsupportedMediaType
10
11 1
from kytos.core import KytosEvent, KytosNApp, log, rest
12 1
from kytos.core.helpers import listen_to
13 1
from napps.kytos.flow_manager.match import match_flow
14 1
from napps.kytos.flow_manager.storehouse import StoreHouse
15 1
from napps.kytos.of_core.flow import FlowFactory
16
17 1
from .exceptions import InvalidCommandError
18 1
from .settings import (CONSISTENCY_COOKIE_IGNORED_RANGE,
19
                       CONSISTENCY_TABLE_ID_IGNORED_RANGE,
20
                       ENABLE_CONSISTENCY_CHECK, FLOWS_DICT_MAX_SIZE)
21
22
23 1
def cast_fields(flow_dict):
24
    """Make casting the match fields from UBInt() to native int ."""
25 1
    match = flow_dict['match']
26 1
    for field, value in match.items():
27 1
        if isinstance(value, UBIntBase):
28
            match[field] = int(value)
29 1
    flow_dict['match'] = match
30 1
    return flow_dict
31
32
33 1
def _validate_range(values):
34
    """Check that the range of flows ignored by the consistency is valid."""
35
    if len(values) != 2:
36
        msg = f'The tuple must have 2 items, not {len(values)}'
37
        raise ValueError(msg)
38
    first, second = values
39
    if second < first:
40
        msg = f'The first value is bigger than the second: {values}'
41
        raise ValueError(msg)
42
    if not isinstance(first, int) or not isinstance(second, int):
43
        msg = f'Expected a tuple of integers, received {values}'
44
        raise TypeError(msg)
45
46
47 1
def _valid_consistency_ignored(consistency_ignored_list):
48
    """Check the format of the list of ignored consistency flows.
49
50
    Check that the list of ignored flows in the consistency check
51
    is well formatted. Returns True, if the list is well
52
    formatted, otherwise return False.
53
    """
54 1
    msg = ('The list of ignored flows in the consistency check'
55
           'is not well formatted, it will be ignored: %s')
56 1
    for consistency_ignored in consistency_ignored_list:
57
        if isinstance(consistency_ignored, tuple):
58
            try:
59
                _validate_range(consistency_ignored)
60
            except (TypeError, ValueError) as error:
61
                log.warn(msg, error)
62
                return False
63
        elif not isinstance(consistency_ignored, (int, tuple)):
64
            error_msg = ('The elements must be of class int or tuple'
65
                         f' but they are: {type(consistency_ignored)}')
66
            log.warn(msg, error_msg)
67
            return False
68 1
    return True
69
70
71 1
class Main(KytosNApp):
72
    """Main class to be used by Kytos controller."""
73
74 1
    def setup(self):
75
        """Replace the 'init' method for the KytosApp subclass.
76
77
        The setup method is automatically called by the run method.
78
        Users shouldn't call this method directly.
79
        """
80 1
        log.debug("flow-manager starting")
81 1
        self._flow_mods_sent = OrderedDict()
82 1
        self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
83 1
        self.cookie_ignored_range = []
84 1
        self.tab_id_ignored_range = []
85 1
        if _valid_consistency_ignored(CONSISTENCY_COOKIE_IGNORED_RANGE):
86 1
            self.cookie_ignored_range = CONSISTENCY_COOKIE_IGNORED_RANGE
87 1
        if _valid_consistency_ignored(CONSISTENCY_TABLE_ID_IGNORED_RANGE):
88 1
            self.tab_id_ignored_range = CONSISTENCY_TABLE_ID_IGNORED_RANGE
89
90
        # Storehouse client to save and restore flow data:
91 1
        self.storehouse = StoreHouse(self.controller)
92
93
        # Format of stored flow data:
94
        # {'flow_persistence': {'dpid_str': {'flow_list': [
95
        #                                     {'command': '<add|delete>',
96
        #                                      'flow': {flow_dict}}]}}}
97 1
        self.stored_flows = {}
98 1
        self.resent_flows = set()
99
100 1
    def execute(self):
101
        """Run once on NApp 'start' or in a loop.
102
103
        The execute method is called by the run method of KytosNApp class.
104
        Users shouldn't call this method directly.
105
        """
106
        self._load_flows()
107
108 1
    def shutdown(self):
109
        """Shutdown routine of the NApp."""
110
        log.debug("flow-manager stopping")
111
112 1
    @listen_to('kytos/of_core.handshake.completed')
113
    def resend_stored_flows(self, event):
114
        """Resend stored Flows."""
115
        # if consistency check is enabled, it should take care of this
116 1
        if ENABLE_CONSISTENCY_CHECK:
117
            return
118 1
        switch = event.content['switch']
119 1
        dpid = str(switch.dpid)
120
        # This can be a problem because this code is running a thread
121 1
        if dpid in self.resent_flows:
122
            log.debug(f'Flow already resent to the switch {dpid}')
123
            return
124 1
        if dpid in self.stored_flows:
125 1
            flow_list = self.stored_flows[dpid]['flow_list']
126 1
            for flow in flow_list:
127 1
                command = flow['command']
128 1
                flows_dict = {"flows": [flow['flow']]}
129 1
                self._install_flows(command, flows_dict, [switch])
130 1
            self.resent_flows.add(dpid)
131 1
            log.info(f'Flows resent to Switch {dpid}')
132
133 1
    @staticmethod
134
    def is_ignored(field, ignored_range):
135
        """Check that the flow field is in the range of ignored flows.
136
137
        Returns True, if the field is in the range of ignored flows,
138
        otherwise it returns False.
139
        """
140 1
        for i in ignored_range:
141 1
            if isinstance(i, tuple):
142 1
                start_range, end_range = i
143 1
                if start_range <= field <= end_range:
144 1
                    return True
145 1
            if isinstance(i, int):
146 1
                if field == i:
147 1
                    return True
148 1
        return False
149
150 1
    def consistency_ignored_check(self, flow):
151
        """Check if the flow is in the list of flows ignored by consistency.
152
153
        Check by `cookie` range and `table_id` range.
154
        Return True if the flow is in the ignored range, otherwise return
155
        False.
156
        """
157
        # Check by cookie
158 1
        if self.is_ignored(flow.cookie, self.cookie_ignored_range):
159 1
            return True
160
161
        # Check by `table_id`
162 1
        if self.is_ignored(flow.table_id, self.tab_id_ignored_range):
163 1
            return True
164 1
        return False
165
166 1
    @listen_to('kytos/of_core.flow_stats.received')
167
    def on_flow_stats_check_consistency(self, event):
168
        """Check the consistency of a switch upon receiving flow stats."""
169
        if not ENABLE_CONSISTENCY_CHECK:
170
            return
171
        switch = event.content['switch']
172
        if switch.is_enabled():
173
            self.check_storehouse_consistency(switch)
174
            if switch.dpid in self.stored_flows:
175
                self.check_switch_consistency(switch)
176
177 1
    def check_switch_consistency(self, switch):
178
        """Check consistency of installed flows for a specific switch."""
179 1
        dpid = switch.dpid
180
181
        # Flows stored in storehouse
182 1
        stored_flows = self.stored_flows[dpid]['flow_list']
183
184 1
        serializer = FlowFactory.get_class(switch)
185
186 1
        for stored_flow in stored_flows:
187 1
            command = stored_flow['command']
188 1
            stored_flow_obj = serializer.from_dict(stored_flow['flow'], switch)
189
190 1
            flow = {'flows': [stored_flow['flow']]}
191
192 1
            if stored_flow_obj not in switch.flows:
193 1
                if command == 'add':
194 1
                    log.info('A consistency problem was detected in '
195
                             f'switch {dpid}.')
196 1
                    self._install_flows(command, flow, [switch])
197 1
                    log.info(f'Flow forwarded to switch {dpid} to be '
198
                             'installed.')
199 1
            elif command == 'delete':
200 1
                log.info('A consistency problem was detected in '
201
                         f'switch {dpid}.')
202 1
                command = 'delete_strict'
203 1
                self._install_flows(command, flow, [switch])
204 1
                log.info(f'Flow forwarded to switch {dpid} to be deleted.')
205
206 1
    def check_storehouse_consistency(self, switch):
207
        """Check consistency of installed flows for a specific switch."""
208 1
        dpid = switch.dpid
209
210 1
        for installed_flow in switch.flows:
211
212
            # Check if the flow is in the ignored flow list
213 1
            if self.consistency_ignored_check(installed_flow):
214 1
                continue
215
216 1
            if dpid not in self.stored_flows:
217
                log.info('A consistency problem was detected in '
218
                         f'switch {dpid}.')
219
                flow = {'flows': [installed_flow.as_dict()]}
220
                command = 'delete_strict'
221
                self._install_flows(command, flow, [switch])
222
                log.info(f'Flow forwarded to switch {dpid} to be deleted.')
223
            else:
224 1
                serializer = FlowFactory.get_class(switch)
225 1
                stored_flows = self.stored_flows[dpid]['flow_list']
226 1
                stored_flows_list = [serializer.from_dict(stored_flow['flow'],
227
                                                          switch)
228
                                     for stored_flow in stored_flows]
229
230 1
                if installed_flow not in stored_flows_list:
231 1
                    log.info('A consistency problem was detected in '
232
                             f'switch {dpid}.')
233 1
                    flow = {'flows': [installed_flow.as_dict()]}
234 1
                    command = 'delete_strict'
235 1
                    self._install_flows(command, flow, [switch])
236 1
                    log.info(f'Flow forwarded to switch {dpid} to be deleted.')
237
238
    # pylint: disable=attribute-defined-outside-init
239 1
    def _load_flows(self):
240
        """Load stored flows."""
241 1
        try:
242 1
            data = self.storehouse.get_data()['flow_persistence']
243 1
            if 'id' in data:
244
                del data['id']
245 1
            self.stored_flows = data
246
        except (KeyError, FileNotFoundError) as error:
247
            log.debug(f'There are no flows to load: {error}')
248
        else:
249 1
            log.info('Flows loaded.')
250
251 1
    def _store_changed_flows(self, command, flow, switch):
252
        """Store changed flows.
253
254
        Args:
255
            command: Flow command to be installed
256
            flow: Flows to be stored
257
            switch: Switch target
258
        """
259 1
        stored_flows_box = deepcopy(self.stored_flows)
260
        # if the flow has a destination dpid it can be stored.
261 1
        if not switch:
262
            log.info('The Flow cannot be stored, the destination switch '
263
                     f'have not been specified: {switch}')
264
            return
265 1
        installed_flow = {}
266 1
        flow_list = []
267 1
        installed_flow['command'] = command
268 1
        installed_flow['flow'] = flow
269 1
        deleted_flows = []
270
271 1
        serializer = FlowFactory.get_class(switch)
272 1
        installed_flow_obj = serializer.from_dict(flow, switch)
273
274 1
        if switch.id not in stored_flows_box:
275
            # Switch not stored, add to box.
276 1
            flow_list.append(installed_flow)
277 1
            stored_flows_box[switch.id] = {'flow_list': flow_list}
278
        else:
279 1
            stored_flows = stored_flows_box[switch.id].get('flow_list', [])
280
            # Check if flow already stored
281 1
            for stored_flow in stored_flows:
282 1
                stored_flow_obj = serializer.from_dict(stored_flow['flow'],
283
                                                       switch)
284
285 1
                version = switch.connection.protocol.version
286
287 1
                if installed_flow['command'] == 'delete':
288
                    # No strict match
289 1
                    if match_flow(flow, version, stored_flow['flow']):
290 1
                        deleted_flows.append(stored_flow)
291
292 1
                elif installed_flow_obj == stored_flow_obj:
293 1
                    if stored_flow['command'] == installed_flow['command']:
294
                        log.debug('Data already stored.')
295
                        return
296
                    # Flow with inconsistency in "command" fields : Remove the
297
                    # old instruction. This happens when there is a stored
298
                    # instruction to install the flow, but the new instruction
299
                    # is to remove it. In this case, the old instruction is
300
                    # removed and the new one is stored.
301 1
                    stored_flow['command'] = installed_flow.get('command')
302 1
                    deleted_flows.append(stored_flow)
303 1
                    break
304
305
            # if installed_flow['command'] != 'delete':
306 1
            stored_flows.append(installed_flow)
307 1
            for i in deleted_flows:
308 1
                stored_flows.remove(i)
309 1
            stored_flows_box[switch.id]['flow_list'] = stored_flows
310
311 1
        stored_flows_box['id'] = 'flow_persistence'
312 1
        self.storehouse.save_flow(stored_flows_box)
313 1
        del stored_flows_box['id']
314 1
        self.stored_flows = deepcopy(stored_flows_box)
315
316 1
    @rest('v2/flows')
317 1
    @rest('v2/flows/<dpid>')
318 1
    def list(self, dpid=None):
319
        """Retrieve all flows from a switch identified by dpid.
320
321
        If no dpid is specified, return all flows from all switches.
322
        """
323 1
        if dpid is None:
324 1
            switches = self.controller.switches.values()
325
        else:
326 1
            switches = [self.controller.get_switch_by_dpid(dpid)]
327
328 1
            if not any(switches):
329 1
                raise NotFound("Switch not found")
330
331 1
        switch_flows = {}
332
333 1
        for switch in switches:
334 1
            flows_dict = [cast_fields(flow.as_dict())
335
                          for flow in switch.flows]
336 1
            switch_flows[switch.dpid] = {'flows': flows_dict}
337
338 1
        return jsonify(switch_flows)
339
340 1
    @listen_to('kytos.flow_manager.flows.(install|delete)')
341
    def event_flows_install_delete(self, event):
342
        """Install or delete flows in the switches through events.
343
344
        Install or delete Flow of switches identified by dpid.
345
        """
346 1
        try:
347 1
            dpid = event.content['dpid']
348 1
            flow_dict = event.content['flow_dict']
349
        except KeyError as error:
350
            log.error("Error getting fields to install or remove "
351
                      f"Flows: {error}")
352
            return
353
354 1
        if event.name == 'kytos.flow_manager.flows.install':
355 1
            command = 'add'
356 1
        elif event.name == 'kytos.flow_manager.flows.delete':
357 1
            command = 'delete'
358
        else:
359
            msg = f'Invalid event "{event.name}", should be install|delete'
360
            raise ValueError(msg)
361
362 1
        switch = self.controller.get_switch_by_dpid(dpid)
363 1
        try:
364 1
            self._install_flows(command, flow_dict, [switch])
365
        except InvalidCommandError as error:
366
            log.error("Error installing or deleting Flow through"
367
                      f" Kytos Event: {error}")
368
369 1
    @rest('v2/flows', methods=['POST'])
370 1
    @rest('v2/flows/<dpid>', methods=['POST'])
371 1
    def add(self, dpid=None):
372
        """Install new flows in the switch identified by dpid.
373
374
        If no dpid is specified, install flows in all switches.
375
        """
376 1
        return self._send_flow_mods_from_request(dpid, "add")
377
378 1
    @rest('v2/delete', methods=['POST'])
379 1
    @rest('v2/delete/<dpid>', methods=['POST'])
380 1
    @rest('v2/flows', methods=['DELETE'])
381 1
    @rest('v2/flows/<dpid>', methods=['DELETE'])
382 1
    def delete(self, dpid=None):
383
        """Delete existing flows in the switch identified by dpid.
384
385
        If no dpid is specified, delete flows from all switches.
386
        """
387 1
        return self._send_flow_mods_from_request(dpid, "delete")
388
389 1
    def _get_all_switches_enabled(self):
390
        """Get a list of all switches enabled."""
391 1
        switches = self.controller.switches.values()
392 1
        return [switch for switch in switches if switch.is_enabled()]
393
394 1
    def _send_flow_mods_from_request(self, dpid, command, flows_dict=None):
395
        """Install FlowsMods from request."""
396 1
        if flows_dict is None:
397 1
            flows_dict = request.get_json() or {}
398 1
            content_type = request.content_type
399
            # Get flow to check if the request is well-formed
400 1
            flows = flows_dict.get('flows', [])
401
402 1
            if content_type is None:
403 1
                result = 'The request body is empty'
404 1
                raise BadRequest(result)
405
406 1
            if content_type != 'application/json':
407 1
                result = ('The content type must be application/json '
408
                          f'(received {content_type}).')
409 1
                raise UnsupportedMediaType(result)
410
411 1
            if not any(flows_dict) or not any(flows):
412 1
                result = 'The request body is not well-formed.'
413 1
                raise BadRequest(result)
414
415 1
        if dpid:
416 1
            switch = self.controller.get_switch_by_dpid(dpid)
417 1
            if not switch:
418 1
                return jsonify({"response": 'dpid not found.'}), 404
419 1
            elif switch.is_enabled() is False:
420 1
                if command == "delete":
421 1
                    self._install_flows(command, flows_dict, [switch])
422
                else:
423 1
                    return jsonify({"response": 'switch is disabled.'}), 404
424
            else:
425 1
                self._install_flows(command, flows_dict, [switch])
426
        else:
427 1
            self._install_flows(command, flows_dict,
428
                                self._get_all_switches_enabled())
429
430 1
        return jsonify({"response": "FlowMod Messages Sent"})
431
432 1
    def _install_flows(self, command, flows_dict, switches=[]):
433
        """Execute all procedures to install flows in the switches.
434
435
        Args:
436
            command: Flow command to be installed
437
            flows_dict: Dictionary with flows to be installed in the switches.
438
            switches: A list of switches
439
        """
440 1
        for switch in switches:
441 1
            serializer = FlowFactory.get_class(switch)
442 1
            flows = flows_dict.get('flows', [])
443 1
            for flow_dict in flows:
444 1
                flow = serializer.from_dict(flow_dict, switch)
445 1
                if command == "delete":
446
                    flow_mod = flow.as_of_delete_flow_mod()
447 1
                elif command == "delete_strict":
448 1
                    flow_mod = flow.as_of_strict_delete_flow_mod()
449 1
                elif command == "add":
450 1
                    flow_mod = flow.as_of_add_flow_mod()
451
                else:
452
                    raise InvalidCommandError
453 1
                self._send_flow_mod(flow.switch, flow_mod)
454 1
                self._add_flow_mod_sent(flow_mod.header.xid, flow, command)
455
456 1
                self._send_napp_event(switch, flow, command)
457 1
                self._store_changed_flows(command, flow_dict, switch)
458
459 1
    def _add_flow_mod_sent(self, xid, flow, command):
460
        """Add the flow mod to the list of flow mods sent."""
461 1
        if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size:
462
            self._flow_mods_sent.popitem(last=False)
463 1
        self._flow_mods_sent[xid] = (flow, command)
464
465 1
    def _send_flow_mod(self, switch, flow_mod):
466 1
        event_name = 'kytos/flow_manager.messages.out.ofpt_flow_mod'
467
468 1
        content = {'destination': switch.connection,
469
                   'message': flow_mod}
470
471 1
        event = KytosEvent(name=event_name, content=content)
472 1
        self.controller.buffers.msg_out.put(event)
473
474 1
    def _send_napp_event(self, switch, flow, command, **kwargs):
475
        """Send an Event to other apps informing about a FlowMod."""
476 1
        if command == 'add':
477 1
            name = 'kytos/flow_manager.flow.added'
478 1
        elif command in ('delete', 'delete_strict'):
479 1
            name = 'kytos/flow_manager.flow.removed'
480 1
        elif command == 'error':
481 1
            name = 'kytos/flow_manager.flow.error'
482
        else:
483
            raise InvalidCommandError
484 1
        content = {'datapath': switch,
485
                   'flow': flow}
486 1
        content.update(kwargs)
487 1
        event_app = KytosEvent(name, content)
488 1
        self.controller.buffers.app.put(event_app)
489
490 1
    @listen_to('.*.of_core.*.ofpt_error')
491
    def handle_errors(self, event):
492
        """Receive OpenFlow error and send a event.
493
494
        The event is sent only if the error is related to a request made
495
        by flow_manager.
496
        """
497 1
        message = event.content["message"]
498
499 1
        connection = event.source
500 1
        switch = connection.switch
501
502 1
        xid = message.header.xid.value
503 1
        error_type = message.error_type
504 1
        error_code = message.code
505 1
        error_data = message.data.pack()
506
507
        # Get the packet responsible for the error
508 1
        error_packet = connection.protocol.unpack(error_data)
509
510 1
        if message.code == BadActionCode.OFPBAC_BAD_OUT_PORT:
511
            actions = []
512
            if hasattr(error_packet, 'actions'):
513
                # Get actions from the flow mod (OF 1.0)
514
                actions = error_packet.actions
515
            else:
516
                # Get actions from the list of flow mod instructions (OF 1.3)
517
                for instruction in error_packet.instructions:
518
                    actions.extend(instruction.actions)
519
520
            for action in actions:
521
                iface = switch.get_interface_by_port_no(action.port)
522
523
                # Set interface to drop packets forwarded to it
524
                if iface:
525
                    iface.config = PortConfig.OFPPC_NO_FWD
526
527 1
        try:
528 1
            flow, error_command = self._flow_mods_sent[xid]
529
        except KeyError:
530
            pass
531
        else:
532 1
            self._send_napp_event(flow.switch, flow, 'error',
533
                                  error_command=error_command,
534
                                  error_type=error_type, error_code=error_code)
535