Passed
Pull Request — master (#112)
by Carlos
03:35
created

build.main.Main.handle_errors()   B

Complexity

Conditions 8

Size

Total Lines 45
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 14.1606

Importance

Changes 0
Metric Value
cc 8
eloc 27
nop 2
dl 0
loc 45
ccs 13
cts 24
cp 0.5417
crap 14.1606
rs 7.3333
c 0
b 0
f 0
1
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
2 1
from collections import OrderedDict
3
4 1
from flask import jsonify, request
5 1
from pyof.v0x01.asynchronous.error_msg import BadActionCode
6 1
from pyof.v0x01.common.phy_port import PortConfig
7
8 1
from kytos.core import KytosEvent, KytosNApp, log, rest
9 1
from kytos.core.helpers import listen_to
10 1
from napps.kytos.flow_manager.storehouse import StoreHouse
11 1
from napps.kytos.of_core.flow import FlowFactory
12
13 1
from .exceptions import InvalidCommandError
14 1
from .settings import (CONSISTENCY_COOKIE_IGNORED_RANGE, CONSISTENCY_INTERVAL,
15
                       CONSISTENCY_TABLE_ID_IGNORED_RANGE, FLOWS_DICT_MAX_SIZE)
16
17
18 1
def _validate_range(values):
19
    """Check that the range of flows ignored by the consistency is valid."""
20
    instance = int
21
    if len(values) != 2:
22
        msg = f'The tuple must have 2 items, not {len(values)}'
23
        raise ValueError(msg)
24
    first, second = values
25
    if second < first:
26
        msg = (f'The first value is bigger than the second: {values}')
27
        raise ValueError(msg)
28
    if not isinstance(first, instance) or not isinstance(second, instance):
29
        msg = 'The elements of the range must be of the class {instance}'
30
        raise TypeError(msg)
31
32
33 1
def _valid_consistency_ignored(consistency_ignored_list):
34
    """Check the format of the list of ignored consistency flows.
35
36
    Check that the list of ignored flows in the consistency check
37
    is well formatted. Returns True, if the list is well
38
    formatted, otherwise return False.
39
    """
40 1
    msg = ('The list of ignored flows in the consistency check'
41
           'is not well formatted, it will be ignored: %s')
42 1
    for consistency_ignored in consistency_ignored_list:
43
        if isinstance(consistency_ignored, tuple):
44
            try:
45
                _validate_range(consistency_ignored)
46
            except (TypeError, ValueError) as error:
47
                log.warn(msg, error)
48
                return False
49
        elif not isinstance(consistency_ignored, (int, tuple)):
50
            error_msg = ('The elements must be of class int or tuple'
51
                         f' but they are: {type(consistency_ignored)}')
52
            log.warn(msg, error_msg)
53
            return False
54 1
    return True
55
56
57 1
class Main(KytosNApp):
58
    """Main class to be used by Kytos controller."""
59
60 1
    def setup(self):
61
        """Replace the 'init' method for the KytosApp subclass.
62
63
        The setup method is automatically called by the run method.
64
        Users shouldn't call this method directly.
65
        """
66 1
        log.debug("flow-manager starting")
67 1
        self._flow_mods_sent = OrderedDict()
68 1
        self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
69 1
        self.cookie_ignored_range = []
70 1
        self.tab_id_ignored_range = []
71 1
        if _valid_consistency_ignored(CONSISTENCY_COOKIE_IGNORED_RANGE):
72 1
            self.cookie_ignored_range = CONSISTENCY_COOKIE_IGNORED_RANGE
73 1
        if _valid_consistency_ignored(CONSISTENCY_TABLE_ID_IGNORED_RANGE):
74 1
            self.tab_id_ignored_range = CONSISTENCY_TABLE_ID_IGNORED_RANGE
75
76
        # Storehouse client to save and restore flow data:
77 1
        self.storehouse = StoreHouse(self.controller)
78
79
        # Format of stored flow data:
80
        # {'flow_persistence': {'dpid_str': {'flow_list': [
81
        #                                     {'command': '<add|delete>',
82
        #                                      'flow': {flow_dict}}]}}}
83 1
        self.stored_flows = {}
84 1
        self.resent_flows = set()
85 1
        if CONSISTENCY_INTERVAL > 0:
86 1
            self.execute_as_loop(CONSISTENCY_INTERVAL)
87
88 1
    def execute(self):
89
        """Run once on NApp 'start' or in a loop.
90
91
        The execute method is called by the run method of KytosNApp class.
92
        Users shouldn't call this method directly.
93
        """
94
        self._load_flows()
95
96
        if CONSISTENCY_INTERVAL > 0:
97
            self.consistency_check()
98
99 1
    def shutdown(self):
100
        """Shutdown routine of the NApp."""
101
        log.debug("flow-manager stopping")
102
103 1
    @listen_to('kytos/of_core.handshake.completed')
104
    def resend_stored_flows(self, event):
105
        """Resend stored Flows."""
106 1
        switch = event.content['switch']
107 1
        dpid = str(switch.dpid)
108
        # This can be a problem because this code is running a thread
109 1
        if dpid in self.resent_flows:
110
            log.debug(f'Flow already resent to the switch {dpid}')
111
            return
112 1
        if dpid in self.stored_flows:
113 1
            flow_list = self.stored_flows[dpid]['flow_list']
114 1
            for flow in flow_list:
115 1
                command = flow['command']
116 1
                flows_dict = {"flows": [flow['flow']]}
117 1
                self._install_flows(command, flows_dict, [switch])
118 1
            self.resent_flows.add(dpid)
119 1
            log.info(f'Flows resent to Switch {dpid}')
120
121 1
    @staticmethod
122
    def is_ignored(field, ignored_range):
123
        """Check that the flow field is in the range of ignored flows.
124
125
        Returns True, if the field is in the range of ignored flows,
126
        otherwise it returns False.
127
        """
128 1
        for i in ignored_range:
129 1
            if isinstance(i, tuple):
130 1
                start_range, end_range = i
131 1
                if start_range <= field <= end_range:
132 1
                    return True
133 1
            if isinstance(i, int):
134 1
                if field == i:
135 1
                    return True
136 1
        return False
137
138 1
    def consistency_ignored_check(self, flow):
139
        """Check if the flow is in the list of flows ignored by consistency.
140
141
        Check by `cookie` range and `table_id` range.
142
        Return True if the flow is in the ignored range, otherwise return
143
        False.
144
        """
145
        # Check by cookie
146 1
        if self.is_ignored(flow.cookie, self.cookie_ignored_range):
147 1
            return True
148
149
        # Check by `table_id`
150 1
        if self.is_ignored(flow.table_id, self.tab_id_ignored_range):
151 1
            return True
152 1
        return False
153
154 1
    def consistency_check(self):
155
        """Check the consistency of flows in each switch."""
156
        switches = self.controller.switches.values()
157
158
        for switch in switches:
159
            # Check if a dpid is a key in 'stored_flows' dictionary
160
            if switch.is_enabled():
161
                self.check_storehouse_consistency(switch)
162
163
                if switch.dpid in self.stored_flows:
164
                    self.check_switch_consistency(switch)
165
166 1
    def check_switch_consistency(self, switch):
167
        """Check consistency of installed flows for a specific switch."""
168 1
        dpid = switch.dpid
169
170
        # Flows stored in storehouse
171 1
        stored_flows = self.stored_flows[dpid]['flow_list']
172
173 1
        serializer = FlowFactory.get_class(switch)
174
175 1
        for stored_flow in stored_flows:
176 1
            command = stored_flow['command']
177 1
            stored_flow_obj = serializer.from_dict(stored_flow['flow'], switch)
178
179 1
            flow = {'flows': [stored_flow['flow']]}
180
181 1
            if stored_flow_obj not in switch.flows:
182 1
                if command == 'add':
183 1
                    log.info('A consistency problem was detected in '
184
                             f'switch {dpid}.')
185 1
                    self._install_flows(command, flow, [switch])
186 1
                    log.info(f'Flow forwarded to switch {dpid} to be '
187
                             'installed.')
188 1
            elif command == 'delete':
189 1
                log.info('A consistency problem was detected in '
190
                         f'switch {dpid}.')
191 1
                command = 'delete_strict'
192 1
                self._install_flows(command, flow, [switch])
193 1
                log.info(f'Flow forwarded to switch {dpid} to be deleted.')
194
195 1
    def check_storehouse_consistency(self, switch):
196
        """Check consistency of installed flows for a specific switch."""
197 1
        dpid = switch.dpid
198
199 1
        for installed_flow in switch.flows:
200
201
            # Check if the flow are in the ignored flow list
202 1
            if self.consistency_ignored_check(installed_flow):
203 1
                continue
204
205 1
            if dpid not in self.stored_flows:
206
                log.info('A consistency problem was detected in '
207
                         f'switch {dpid}.')
208
                flow = {'flows': [installed_flow.as_dict()]}
209
                command = 'delete_strict'
210
                self._install_flows(command, flow, [switch])
211
                log.info(f'Flow forwarded to switch {dpid} to be deleted.')
212
            else:
213 1
                serializer = FlowFactory.get_class(switch)
214 1
                stored_flows = self.stored_flows[dpid]['flow_list']
215 1
                stored_flows_list = [serializer.from_dict(stored_flow['flow'],
216
                                                          switch)
217
                                     for stored_flow in stored_flows]
218
219 1
                if installed_flow not in stored_flows_list:
220 1
                    log.info('A consistency problem was detected in '
221
                             f'switch {dpid}.')
222 1
                    flow = {'flows': [installed_flow.as_dict()]}
223 1
                    command = 'delete_strict'
224 1
                    self._install_flows(command, flow, [switch])
225 1
                    log.info(f'Flow forwarded to switch {dpid} to be deleted.')
226
227
    # pylint: disable=attribute-defined-outside-init
228 1
    def _load_flows(self):
229
        """Load stored flows."""
230 1
        try:
231 1
            data = self.storehouse.get_data()['flow_persistence']
232 1
            if 'id' in data:
233
                del data['id']
234 1
            self.stored_flows = data
235
        except (KeyError, FileNotFoundError) as error:
236
            log.debug(f'There are no flows to load: {error}')
237
        else:
238 1
            log.info('Flows loaded.')
239
240 1
    def _store_changed_flows(self, command, flow, switch):
241
        """Store changed flows.
242
243
        Args:
244
            command: Flow command to be installed
245
            flow: Flows to be stored
246
            switch: Switch target
247
        """
248 1
        stored_flows_box = self.stored_flows.copy()
249
        # if the flow has a destination dpid it can be stored.
250 1
        if not switch:
251
            log.info('The Flow cannot be stored, the destination switch '
252
                     f'have not been specified: {switch}')
253
            return
254 1
        installed_flow = {}
255 1
        flow_list = []
256 1
        installed_flow['command'] = command
257 1
        installed_flow['flow'] = flow
258
259 1
        serializer = FlowFactory.get_class(switch)
260 1
        installed_flow_obj = serializer.from_dict(flow, switch)
261
262 1
        if switch.id not in stored_flows_box:
263
            # Switch not stored, add to box.
264 1
            flow_list.append(installed_flow)
265 1
            stored_flows_box[switch.id] = {'flow_list': flow_list}
266
        else:
267 1
            stored_flows = stored_flows_box[switch.id].get('flow_list', [])
268
            # Check if flow already stored
269 1
            for stored_flow in stored_flows:
270 1
                stored_flow_obj = serializer.from_dict(stored_flow['flow'],
271
                                                       switch)
272 1
                if installed_flow_obj == stored_flow_obj:
273 1
                    if stored_flow['command'] == installed_flow['command']:
274
                        log.debug('Data already stored.')
275
                        return
276
                    # Flow with inconsistency in "command" fields : Remove the
277
                    # old instruction. This happens when there is a stored
278
                    # instruction to install the flow, but the new instruction
279
                    # is to remove it. In this case, the old instruction is
280
                    # removed and the new one is stored.
281 1
                    stored_flow['command'] = installed_flow.get('command')
282 1
                    stored_flows.remove(stored_flow)
283 1
                    break
284
285 1
            stored_flows.append(installed_flow)
286 1
            stored_flows_box[switch.id]['flow_list'] = stored_flows
287
288 1
        stored_flows_box['id'] = 'flow_persistence'
289 1
        self.storehouse.save_flow(stored_flows_box)
290 1
        del stored_flows_box['id']
291 1
        self.stored_flows = stored_flows_box.copy()
292
293 1
    @rest('v2/flows')
294 1
    @rest('v2/flows/<dpid>')
295 1
    def list(self, dpid=None):
296
        """Retrieve all flows from a switch identified by dpid.
297
298
        If no dpid is specified, return all flows from all switches.
299
        """
300 1
        if dpid is None:
301 1
            switches = self.controller.switches.values()
302
        else:
303 1
            switches = [self.controller.get_switch_by_dpid(dpid)]
304
305 1
        switch_flows = {}
306
307 1
        for switch in switches:
308 1
            flows_dict = [flow.as_dict() for flow in switch.flows]
309 1
            switch_flows[switch.dpid] = {'flows': flows_dict}
310
311 1
        return jsonify(switch_flows)
312
313 1
    @rest('v2/flows', methods=['POST'])
314 1
    @rest('v2/flows/<dpid>', methods=['POST'])
315 1
    def add(self, dpid=None):
316
        """Install new flows in the switch identified by dpid.
317
318
        If no dpid is specified, install flows in all switches.
319
        """
320 1
        return self._send_flow_mods_from_request(dpid, "add")
321
322 1
    @rest('v2/delete', methods=['POST'])
323 1
    @rest('v2/delete/<dpid>', methods=['POST'])
324 1
    @rest('v2/flows', methods=['DELETE'])
325 1
    @rest('v2/flows/<dpid>', methods=['DELETE'])
326 1
    def delete(self, dpid=None):
327
        """Delete existing flows in the switch identified by dpid.
328
329
        If no dpid is specified, delete flows from all switches.
330
        """
331 1
        return self._send_flow_mods_from_request(dpid, "delete")
332
333 1
    def _get_all_switches_enabled(self):
334
        """Get a list of all switches enabled."""
335 1
        switches = self.controller.switches.values()
336 1
        return [switch for switch in switches if switch.is_enabled()]
337
338 1
    def _send_flow_mods_from_request(self, dpid, command, flows_dict=None):
339
        """Install FlowsMods from request."""
340 1
        if flows_dict is None:
341 1
            flows_dict = request.get_json()
342 1
            if flows_dict is None:
343 1
                return jsonify({"response": 'flows dict is none.'}), 404
344
345 1
        if dpid:
346 1
            switch = self.controller.get_switch_by_dpid(dpid)
347 1
            if not switch:
348 1
                return jsonify({"response": 'dpid not found.'}), 404
349 1
            elif switch.is_enabled() is False:
350 1
                if command == "delete":
351 1
                    self._install_flows(command, flows_dict, [switch])
352
                else:
353 1
                    return jsonify({"response": 'switch is disabled.'}), 404
354
            else:
355 1
                self._install_flows(command, flows_dict, [switch])
356
        else:
357 1
            self._install_flows(command, flows_dict,
358
                                self._get_all_switches_enabled())
359
360 1
        return jsonify({"response": "FlowMod Messages Sent"})
361
362 1
    def _install_flows(self, command, flows_dict, switches=[]):
363
        """Execute all procedures to install flows in the switches.
364
365
        Args:
366
            command: Flow command to be installed
367
            flows_dict: Dictionary with flows to be installed in the switches.
368
            switches: A list of switches
369
        """
370 1
        for switch in switches:
371 1
            serializer = FlowFactory.get_class(switch)
372 1
            flows = flows_dict.get('flows', [])
373 1
            for flow_dict in flows:
374 1
                flow = serializer.from_dict(flow_dict, switch)
375 1
                if command == "delete":
376
                    flow_mod = flow.as_of_delete_flow_mod()
377 1
                elif command == "delete_strict":
378 1
                    flow_mod = flow.as_of_strict_delete_flow_mod()
379 1
                elif command == "add":
380 1
                    flow_mod = flow.as_of_add_flow_mod()
381
                else:
382
                    raise InvalidCommandError
383 1
                self._send_flow_mod(flow.switch, flow_mod)
384 1
                self._add_flow_mod_sent(flow_mod.header.xid, flow, command)
385
386 1
                self._send_napp_event(switch, flow, command)
387 1
                self._store_changed_flows(command, flow_dict, switch)
388
389 1
    def _add_flow_mod_sent(self, xid, flow, command):
390
        """Add the flow mod to the list of flow mods sent."""
391 1
        if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size:
392
            self._flow_mods_sent.popitem(last=False)
393 1
        self._flow_mods_sent[xid] = (flow, command)
394
395 1
    def _send_flow_mod(self, switch, flow_mod):
396 1
        event_name = 'kytos/flow_manager.messages.out.ofpt_flow_mod'
397
398 1
        content = {'destination': switch.connection,
399
                   'message': flow_mod}
400
401 1
        event = KytosEvent(name=event_name, content=content)
402 1
        self.controller.buffers.msg_out.put(event)
403
404 1
    def _send_napp_event(self, switch, flow, command, **kwargs):
405
        """Send an Event to other apps informing about a FlowMod."""
406 1
        if command == 'add':
407 1
            name = 'kytos/flow_manager.flow.added'
408 1
        elif command in ('delete', 'delete_strict'):
409 1
            name = 'kytos/flow_manager.flow.removed'
410 1
        elif command == 'error':
411 1
            name = 'kytos/flow_manager.flow.error'
412
        else:
413
            raise InvalidCommandError
414 1
        content = {'datapath': switch,
415
                   'flow': flow}
416 1
        content.update(kwargs)
417 1
        event_app = KytosEvent(name, content)
418 1
        self.controller.buffers.app.put(event_app)
419
420 1
    @listen_to('.*.of_core.*.ofpt_error')
421
    def handle_errors(self, event):
422
        """Receive OpenFlow error and send a event.
423
424
        The event is sent only if the error is related to a request made
425
        by flow_manager.
426
        """
427 1
        message = event.content["message"]
428
429 1
        connection = event.source
430 1
        switch = connection.switch
431
432 1
        xid = message.header.xid.value
433 1
        error_type = message.error_type
434 1
        error_code = message.code
435 1
        error_data = message.data.pack()
436
437
        # Get the packet responsible for the error
438 1
        error_packet = connection.protocol.unpack(error_data)
439
440 1
        if message.code == BadActionCode.OFPBAC_BAD_OUT_PORT:
441
            actions = []
442
            if hasattr(error_packet, 'actions'):
443
                # Get actions from the flow mod (OF 1.0)
444
                actions = error_packet.actions
445
            else:
446
                # Get actions from the list of flow mod instructions (OF 1.3)
447
                for instruction in error_packet.instructions:
448
                    actions.extend(instruction.actions)
449
450
            for action in actions:
451
                iface = switch.get_interface_by_port_no(action.port)
452
453
                # Set interface to drop packets forwarded to it
454
                if iface:
455
                    iface.config = PortConfig.OFPPC_NO_FWD
456
457 1
        try:
458 1
            flow, error_command = self._flow_mods_sent[xid]
459
        except KeyError:
460
            pass
461
        else:
462 1
            self._send_napp_event(flow.switch, flow, 'error',
463
                                  error_command=error_command,
464
                                  error_type=error_type, error_code=error_code)
465