Passed
Push — master ( 99d253...10f509 )
by Humberto
03:29
created

build.main.Main.add()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 4
cts 4
cp 1
cc 1
nop 2
crap 1
1
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
2 1
from collections import OrderedDict
3 1
from copy import deepcopy
4
5 1
from flask import jsonify, request
6 1
from pyof.foundation.base import UBIntBase
7 1
from pyof.v0x01.asynchronous.error_msg import BadActionCode
8 1
from pyof.v0x01.common.phy_port import PortConfig
9 1
from werkzeug.exceptions import BadRequest, NotFound, UnsupportedMediaType
10
11 1
from kytos.core import KytosEvent, KytosNApp, log, rest
12 1
from kytos.core.helpers import listen_to
13 1
from napps.kytos.flow_manager.match import match_flow
14 1
from napps.kytos.flow_manager.storehouse import StoreHouse
15 1
from napps.kytos.of_core.flow import FlowFactory
16
17 1
from .exceptions import InvalidCommandError
18 1
from .settings import (CONSISTENCY_COOKIE_IGNORED_RANGE, CONSISTENCY_INTERVAL,
19
                       CONSISTENCY_TABLE_ID_IGNORED_RANGE, FLOWS_DICT_MAX_SIZE)
20
21
22 1
def cast_fields(flow_dict):
23
    """Make casting the match fields from UBInt() to native int ."""
24 1
    match = flow_dict['match']
25 1
    for field, value in match.items():
26 1
        if isinstance(value, UBIntBase):
27
            match[field] = int(value)
28 1
    flow_dict['match'] = match
29 1
    return flow_dict
30
31
32 1
def _validate_range(values):
33
    """Check that the range of flows ignored by the consistency is valid."""
34
    if len(values) != 2:
35
        msg = f'The tuple must have 2 items, not {len(values)}'
36
        raise ValueError(msg)
37
    first, second = values
38
    if second < first:
39
        msg = f'The first value is bigger than the second: {values}'
40
        raise ValueError(msg)
41
    if not isinstance(first, int) or not isinstance(second, int):
42
        msg = f'Expected a tuple of integers, received {values}'
43
        raise TypeError(msg)
44
45
46 1
def _valid_consistency_ignored(consistency_ignored_list):
47
    """Check the format of the list of ignored consistency flows.
48
49
    Check that the list of ignored flows in the consistency check
50
    is well formatted. Returns True, if the list is well
51
    formatted, otherwise return False.
52
    """
53 1
    msg = ('The list of ignored flows in the consistency check'
54
           'is not well formatted, it will be ignored: %s')
55 1
    for consistency_ignored in consistency_ignored_list:
56
        if isinstance(consistency_ignored, tuple):
57
            try:
58
                _validate_range(consistency_ignored)
59
            except (TypeError, ValueError) as error:
60
                log.warn(msg, error)
61
                return False
62
        elif not isinstance(consistency_ignored, (int, tuple)):
63
            error_msg = ('The elements must be of class int or tuple'
64
                         f' but they are: {type(consistency_ignored)}')
65
            log.warn(msg, error_msg)
66
            return False
67 1
    return True
68
69
70 1
class Main(KytosNApp):
71
    """Main class to be used by Kytos controller."""
72
73 1
    def setup(self):
74
        """Replace the 'init' method for the KytosApp subclass.
75
76
        The setup method is automatically called by the run method.
77
        Users shouldn't call this method directly.
78
        """
79 1
        log.debug("flow-manager starting")
80 1
        self._flow_mods_sent = OrderedDict()
81 1
        self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
82 1
        self.cookie_ignored_range = []
83 1
        self.tab_id_ignored_range = []
84 1
        if _valid_consistency_ignored(CONSISTENCY_COOKIE_IGNORED_RANGE):
85 1
            self.cookie_ignored_range = CONSISTENCY_COOKIE_IGNORED_RANGE
86 1
        if _valid_consistency_ignored(CONSISTENCY_TABLE_ID_IGNORED_RANGE):
87 1
            self.tab_id_ignored_range = CONSISTENCY_TABLE_ID_IGNORED_RANGE
88
89
        # Storehouse client to save and restore flow data:
90 1
        self.storehouse = StoreHouse(self.controller)
91
92
        # Format of stored flow data:
93
        # {'flow_persistence': {'dpid_str': {'flow_list': [
94
        #                                     {'command': '<add|delete>',
95
        #                                      'flow': {flow_dict}}]}}}
96 1
        self.stored_flows = {}
97 1
        self.resent_flows = set()
98 1
        if CONSISTENCY_INTERVAL > 0:
99 1
            self.execute_as_loop(CONSISTENCY_INTERVAL)
100
101 1
    def execute(self):
102
        """Run once on NApp 'start' or in a loop.
103
104
        The execute method is called by the run method of KytosNApp class.
105
        Users shouldn't call this method directly.
106
        """
107
        self._load_flows()
108
109
        if CONSISTENCY_INTERVAL > 0:
110
            self.consistency_check()
111
112 1
    def shutdown(self):
113
        """Shutdown routine of the NApp."""
114
        log.debug("flow-manager stopping")
115
116 1
    @listen_to('kytos/of_core.handshake.completed')
117
    def resend_stored_flows(self, event):
118
        """Resend stored Flows."""
119
        # if consistency check is enabled, it should take care of this
120 1
        if CONSISTENCY_INTERVAL >= 0:
121
            return
122 1
        switch = event.content['switch']
123 1
        dpid = str(switch.dpid)
124
        # This can be a problem because this code is running a thread
125 1
        if dpid in self.resent_flows:
126
            log.debug(f'Flow already resent to the switch {dpid}')
127
            return
128 1
        if dpid in self.stored_flows:
129 1
            flow_list = self.stored_flows[dpid]['flow_list']
130 1
            for flow in flow_list:
131 1
                command = flow['command']
132 1
                flows_dict = {"flows": [flow['flow']]}
133 1
                self._install_flows(command, flows_dict, [switch])
134 1
            self.resent_flows.add(dpid)
135 1
            log.info(f'Flows resent to Switch {dpid}')
136
137 1
    @staticmethod
138
    def is_ignored(field, ignored_range):
139
        """Check that the flow field is in the range of ignored flows.
140
141
        Returns True, if the field is in the range of ignored flows,
142
        otherwise it returns False.
143
        """
144 1
        for i in ignored_range:
145 1
            if isinstance(i, tuple):
146 1
                start_range, end_range = i
147 1
                if start_range <= field <= end_range:
148 1
                    return True
149 1
            if isinstance(i, int):
150 1
                if field == i:
151 1
                    return True
152 1
        return False
153
154 1
    def consistency_ignored_check(self, flow):
155
        """Check if the flow is in the list of flows ignored by consistency.
156
157
        Check by `cookie` range and `table_id` range.
158
        Return True if the flow is in the ignored range, otherwise return
159
        False.
160
        """
161
        # Check by cookie
162 1
        if self.is_ignored(flow.cookie, self.cookie_ignored_range):
163 1
            return True
164
165
        # Check by `table_id`
166 1
        if self.is_ignored(flow.table_id, self.tab_id_ignored_range):
167 1
            return True
168 1
        return False
169
170 1
    def consistency_check(self):
171
        """Check the consistency of flows in each switch."""
172
        switches = self.controller.switches.values()
173
174
        for switch in switches:
175
            # Check if a dpid is a key in 'stored_flows' dictionary
176
            if switch.is_enabled():
177
                self.check_storehouse_consistency(switch)
178
179
                if switch.dpid in self.stored_flows:
180
                    self.check_switch_consistency(switch)
181
182 1
    def check_switch_consistency(self, switch):
183
        """Check consistency of installed flows for a specific switch."""
184 1
        dpid = switch.dpid
185
186
        # Flows stored in storehouse
187 1
        stored_flows = self.stored_flows[dpid]['flow_list']
188
189 1
        serializer = FlowFactory.get_class(switch)
190
191 1
        for stored_flow in stored_flows:
192 1
            command = stored_flow['command']
193 1
            stored_flow_obj = serializer.from_dict(stored_flow['flow'], switch)
194
195 1
            flow = {'flows': [stored_flow['flow']]}
196
197 1
            if stored_flow_obj not in switch.flows:
198 1
                if command == 'add':
199 1
                    log.info('A consistency problem was detected in '
200
                             f'switch {dpid}.')
201 1
                    self._install_flows(command, flow, [switch])
202 1
                    log.info(f'Flow forwarded to switch {dpid} to be '
203
                             'installed.')
204 1
            elif command == 'delete':
205 1
                log.info('A consistency problem was detected in '
206
                         f'switch {dpid}.')
207 1
                command = 'delete_strict'
208 1
                self._install_flows(command, flow, [switch])
209 1
                log.info(f'Flow forwarded to switch {dpid} to be deleted.')
210
211 1
    def check_storehouse_consistency(self, switch):
212
        """Check consistency of installed flows for a specific switch."""
213 1
        dpid = switch.dpid
214
215 1
        for installed_flow in switch.flows:
216
217
            # Check if the flow are in the ignored flow list
218 1
            if self.consistency_ignored_check(installed_flow):
219 1
                continue
220
221 1
            if dpid not in self.stored_flows:
222
                log.info('A consistency problem was detected in '
223
                         f'switch {dpid}.')
224
                flow = {'flows': [installed_flow.as_dict()]}
225
                command = 'delete_strict'
226
                self._install_flows(command, flow, [switch])
227
                log.info(f'Flow forwarded to switch {dpid} to be deleted.')
228
            else:
229 1
                serializer = FlowFactory.get_class(switch)
230 1
                stored_flows = self.stored_flows[dpid]['flow_list']
231 1
                stored_flows_list = [serializer.from_dict(stored_flow['flow'],
232
                                                          switch)
233
                                     for stored_flow in stored_flows]
234
235 1
                if installed_flow not in stored_flows_list:
236 1
                    log.info('A consistency problem was detected in '
237
                             f'switch {dpid}.')
238 1
                    flow = {'flows': [installed_flow.as_dict()]}
239 1
                    command = 'delete_strict'
240 1
                    self._install_flows(command, flow, [switch])
241 1
                    log.info(f'Flow forwarded to switch {dpid} to be deleted.')
242
243
    # pylint: disable=attribute-defined-outside-init
244 1
    def _load_flows(self):
245
        """Load stored flows."""
246 1
        try:
247 1
            data = self.storehouse.get_data()['flow_persistence']
248 1
            if 'id' in data:
249
                del data['id']
250 1
            self.stored_flows = data
251
        except (KeyError, FileNotFoundError) as error:
252
            log.debug(f'There are no flows to load: {error}')
253
        else:
254 1
            log.info('Flows loaded.')
255
256 1
    def _store_changed_flows(self, command, flow, switch):
257
        """Store changed flows.
258
259
        Args:
260
            command: Flow command to be installed
261
            flow: Flows to be stored
262
            switch: Switch target
263
        """
264 1
        stored_flows_box = deepcopy(self.stored_flows)
265
        # if the flow has a destination dpid it can be stored.
266 1
        if not switch:
267
            log.info('The Flow cannot be stored, the destination switch '
268
                     f'have not been specified: {switch}')
269
            return
270 1
        installed_flow = {}
271 1
        flow_list = []
272 1
        installed_flow['command'] = command
273 1
        installed_flow['flow'] = flow
274 1
        deleted_flows = []
275
276 1
        serializer = FlowFactory.get_class(switch)
277 1
        installed_flow_obj = serializer.from_dict(flow, switch)
278
279 1
        if switch.id not in stored_flows_box:
280
            # Switch not stored, add to box.
281 1
            flow_list.append(installed_flow)
282 1
            stored_flows_box[switch.id] = {'flow_list': flow_list}
283
        else:
284 1
            stored_flows = stored_flows_box[switch.id].get('flow_list', [])
285
            # Check if flow already stored
286 1
            for stored_flow in stored_flows:
287 1
                stored_flow_obj = serializer.from_dict(stored_flow['flow'],
288
                                                       switch)
289
290 1
                version = switch.connection.protocol.version
291
292 1
                if installed_flow['command'] == 'delete':
293
                    # No strict match
294 1
                    if match_flow(flow, version, stored_flow['flow']):
295 1
                        deleted_flows.append(stored_flow)
296
297 1
                elif installed_flow_obj == stored_flow_obj:
298 1
                    if stored_flow['command'] == installed_flow['command']:
299
                        log.debug('Data already stored.')
300
                        return
301
                    # Flow with inconsistency in "command" fields : Remove the
302
                    # old instruction. This happens when there is a stored
303
                    # instruction to install the flow, but the new instruction
304
                    # is to remove it. In this case, the old instruction is
305
                    # removed and the new one is stored.
306 1
                    stored_flow['command'] = installed_flow.get('command')
307 1
                    deleted_flows.append(stored_flow)
308 1
                    break
309
310
            # if installed_flow['command'] != 'delete':
311 1
            stored_flows.append(installed_flow)
312 1
            for i in deleted_flows:
313 1
                stored_flows.remove(i)
314 1
            stored_flows_box[switch.id]['flow_list'] = stored_flows
315
316 1
        stored_flows_box['id'] = 'flow_persistence'
317 1
        self.storehouse.save_flow(stored_flows_box)
318 1
        del stored_flows_box['id']
319 1
        self.stored_flows = deepcopy(stored_flows_box)
320
321 1
    @rest('v2/flows')
322 1
    @rest('v2/flows/<dpid>')
323 1
    def list(self, dpid=None):
324
        """Retrieve all flows from a switch identified by dpid.
325
326
        If no dpid is specified, return all flows from all switches.
327
        """
328 1
        if dpid is None:
329 1
            switches = self.controller.switches.values()
330
        else:
331 1
            switches = [self.controller.get_switch_by_dpid(dpid)]
332
333 1
            if not any(switches):
334 1
                raise NotFound("Switch not found")
335
336 1
        switch_flows = {}
337
338 1
        for switch in switches:
339 1
            flows_dict = [cast_fields(flow.as_dict())
340
                          for flow in switch.flows]
341 1
            switch_flows[switch.dpid] = {'flows': flows_dict}
342
343 1
        return jsonify(switch_flows)
344
345 1
    @listen_to('kytos.flow_manager.flows.(install|delete)')
346
    def event_flows_install_delete(self, event):
347
        """Install or delete flows in the switches through events.
348
349
        Install or delete Flow of switches identified by dpid.
350
        """
351 1
        try:
352 1
            dpid = event.content['dpid']
353 1
            flow_dict = event.content['flow_dict']
354
        except KeyError as error:
355
            log.error("Error getting fields to install or remove "
356
                      f"Flows: {error}")
357
            return
358
359 1
        if event.name == 'kytos.flow_manager.flows.install':
360 1
            command = 'add'
361 1
        elif event.name == 'kytos.flow_manager.flows.delete':
362 1
            command = 'delete'
363
        else:
364
            msg = f'Invalid event "{event.name}", should be install|delete'
365
            raise ValueError(msg)
366
367 1
        switch = self.controller.get_switch_by_dpid(dpid)
368 1
        try:
369 1
            self._install_flows(command, flow_dict, [switch])
370
        except InvalidCommandError as error:
371
            log.error("Error installing or deleting Flow through"
372
                      f" Kytos Event: {error}")
373
374 1
    @rest('v2/flows', methods=['POST'])
375 1
    @rest('v2/flows/<dpid>', methods=['POST'])
376 1
    def add(self, dpid=None):
377
        """Install new flows in the switch identified by dpid.
378
379
        If no dpid is specified, install flows in all switches.
380
        """
381 1
        return self._send_flow_mods_from_request(dpid, "add")
382
383 1
    @rest('v2/delete', methods=['POST'])
384 1
    @rest('v2/delete/<dpid>', methods=['POST'])
385 1
    @rest('v2/flows', methods=['DELETE'])
386 1
    @rest('v2/flows/<dpid>', methods=['DELETE'])
387 1
    def delete(self, dpid=None):
388
        """Delete existing flows in the switch identified by dpid.
389
390
        If no dpid is specified, delete flows from all switches.
391
        """
392 1
        return self._send_flow_mods_from_request(dpid, "delete")
393
394 1
    def _get_all_switches_enabled(self):
395
        """Get a list of all switches enabled."""
396 1
        switches = self.controller.switches.values()
397 1
        return [switch for switch in switches if switch.is_enabled()]
398
399 1
    def _send_flow_mods_from_request(self, dpid, command, flows_dict=None):
400
        """Install FlowsMods from request."""
401 1
        if flows_dict is None:
402 1
            flows_dict = request.get_json() or {}
403 1
            content_type = request.content_type
404
            # Get flow to check if the request is well-formed
405 1
            flows = flows_dict.get('flows', [])
406
407 1
            if content_type is None:
408 1
                result = 'The request body is empty'
409 1
                raise BadRequest(result)
410
411 1
            if content_type != 'application/json':
412 1
                result = ('The content type must be application/json '
413
                          f'(received {content_type}).')
414 1
                raise UnsupportedMediaType(result)
415
416 1
            if not any(flows_dict) or not any(flows):
417 1
                result = 'The request body is not well-formed.'
418 1
                raise BadRequest(result)
419
420 1
        if dpid:
421 1
            switch = self.controller.get_switch_by_dpid(dpid)
422 1
            if not switch:
423 1
                return jsonify({"response": 'dpid not found.'}), 404
424 1
            elif switch.is_enabled() is False:
425 1
                if command == "delete":
426 1
                    self._install_flows(command, flows_dict, [switch])
427
                else:
428 1
                    return jsonify({"response": 'switch is disabled.'}), 404
429
            else:
430 1
                self._install_flows(command, flows_dict, [switch])
431
        else:
432 1
            self._install_flows(command, flows_dict,
433
                                self._get_all_switches_enabled())
434
435 1
        return jsonify({"response": "FlowMod Messages Sent"})
436
437 1
    def _install_flows(self, command, flows_dict, switches=[]):
438
        """Execute all procedures to install flows in the switches.
439
440
        Args:
441
            command: Flow command to be installed
442
            flows_dict: Dictionary with flows to be installed in the switches.
443
            switches: A list of switches
444
        """
445 1
        for switch in switches:
446 1
            serializer = FlowFactory.get_class(switch)
447 1
            flows = flows_dict.get('flows', [])
448 1
            for flow_dict in flows:
449 1
                flow = serializer.from_dict(flow_dict, switch)
450 1
                if command == "delete":
451
                    flow_mod = flow.as_of_delete_flow_mod()
452 1
                elif command == "delete_strict":
453 1
                    flow_mod = flow.as_of_strict_delete_flow_mod()
454 1
                elif command == "add":
455 1
                    flow_mod = flow.as_of_add_flow_mod()
456
                else:
457
                    raise InvalidCommandError
458 1
                self._send_flow_mod(flow.switch, flow_mod)
459 1
                self._add_flow_mod_sent(flow_mod.header.xid, flow, command)
460
461 1
                self._send_napp_event(switch, flow, command)
462 1
                self._store_changed_flows(command, flow_dict, switch)
463
464 1
    def _add_flow_mod_sent(self, xid, flow, command):
465
        """Add the flow mod to the list of flow mods sent."""
466 1
        if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size:
467
            self._flow_mods_sent.popitem(last=False)
468 1
        self._flow_mods_sent[xid] = (flow, command)
469
470 1
    def _send_flow_mod(self, switch, flow_mod):
471 1
        event_name = 'kytos/flow_manager.messages.out.ofpt_flow_mod'
472
473 1
        content = {'destination': switch.connection,
474
                   'message': flow_mod}
475
476 1
        event = KytosEvent(name=event_name, content=content)
477 1
        self.controller.buffers.msg_out.put(event)
478
479 1
    def _send_napp_event(self, switch, flow, command, **kwargs):
480
        """Send an Event to other apps informing about a FlowMod."""
481 1
        if command == 'add':
482 1
            name = 'kytos/flow_manager.flow.added'
483 1
        elif command in ('delete', 'delete_strict'):
484 1
            name = 'kytos/flow_manager.flow.removed'
485 1
        elif command == 'error':
486 1
            name = 'kytos/flow_manager.flow.error'
487
        else:
488
            raise InvalidCommandError
489 1
        content = {'datapath': switch,
490
                   'flow': flow}
491 1
        content.update(kwargs)
492 1
        event_app = KytosEvent(name, content)
493 1
        self.controller.buffers.app.put(event_app)
494
495 1
    @listen_to('.*.of_core.*.ofpt_error')
496
    def handle_errors(self, event):
497
        """Receive OpenFlow error and send a event.
498
499
        The event is sent only if the error is related to a request made
500
        by flow_manager.
501
        """
502 1
        message = event.content["message"]
503
504 1
        connection = event.source
505 1
        switch = connection.switch
506
507 1
        xid = message.header.xid.value
508 1
        error_type = message.error_type
509 1
        error_code = message.code
510 1
        error_data = message.data.pack()
511
512
        # Get the packet responsible for the error
513 1
        error_packet = connection.protocol.unpack(error_data)
514
515 1
        if message.code == BadActionCode.OFPBAC_BAD_OUT_PORT:
516
            actions = []
517
            if hasattr(error_packet, 'actions'):
518
                # Get actions from the flow mod (OF 1.0)
519
                actions = error_packet.actions
520
            else:
521
                # Get actions from the list of flow mod instructions (OF 1.3)
522
                for instruction in error_packet.instructions:
523
                    actions.extend(instruction.actions)
524
525
            for action in actions:
526
                iface = switch.get_interface_by_port_no(action.port)
527
528
                # Set interface to drop packets forwarded to it
529
                if iface:
530
                    iface.config = PortConfig.OFPPC_NO_FWD
531
532 1
        try:
533 1
            flow, error_command = self._flow_mods_sent[xid]
534
        except KeyError:
535
            pass
536
        else:
537 1
            self._send_napp_event(flow.switch, flow, 'error',
538
                                  error_command=error_command,
539
                                  error_type=error_type, error_code=error_code)
540