Test Failed
Pull Request — master (#96)
by Jose
03:33
created

build.main.Main._send_flow_mods_from_request()   B

Complexity

Conditions 6

Size

Total Lines 20
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
cc 6
eloc 15
nop 4
dl 0
loc 20
ccs 0
cts 0
cp 0
crap 42
rs 8.6666
c 0
b 0
f 0
1
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
2 1
from collections import OrderedDict
3
4 1
from flask import jsonify, request
5
6 1
from kytos.core import KytosEvent, KytosNApp, log, rest
7 1
from kytos.core.helpers import listen_to
8 1
from napps.kytos.flow_manager.storehouse import StoreHouse
9
from napps.kytos.of_core.flow import FlowFactory
10 1
11 1
from .exceptions import InvalidCommandError
12
from .settings import FLOWS_DICT_MAX_SIZE
13
14 1
15
class Main(KytosNApp):
16
    """Main class to be used by Kytos controller."""
17 1
18
    def setup(self):
19
        """Replace the 'init' method for the KytosApp subclass.
20
21
        The setup method is automatically called by the run method.
22
        Users shouldn't call this method directly.
23 1
        """
24 1
        log.debug("flow-manager starting")
25 1
        self._flow_mods_sent = OrderedDict()
26
        self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
27 1
28
        # Format of stored data
29
        # {"flow_persistence":{
30
        #     "dpid_str":{
31
        #         "flow_list":[
32
        #             {"match_storage":{},
33
        #             "data":{"flows":[]}}
34 1
        #         ]
35
        #     }
36
        # }}
37
        # client to load and save data in storehouse
38 1
        self.storehouse = StoreHouse(self.controller)
39 1
        self.stored_flows = {}
40 1
        self.resent_flows = set()
41
42
    def execute(self):
43
        """Run once on NApp 'start' or in a loop.
44
45 1
        The execute method is called by the run method of KytosNApp class.
46 1
        Users shouldn't call this method directly.
47
        """
48 1
        self._load_flows()
49
50 1
    def shutdown(self):
51
        """Shutdown routine of the NApp."""
52 1
        log.debug("flow-manager stopping")
53 1
54 1
    def consistency_check(self):
55
        """Check the consistency of flows in each switch."""
56 1
        switches = self.controller.switches.values()
57
58 1
        for switch in switches:
59 1
            if switch.dpid in self.stored_flows:
60 1
                self.consistency_in_switch(switch)
61
            else:
62
                continue
63
64
    def consistency_in_switch(self, switch):
65 1
        """Check consistency for a specific switch."""
66
        switch_flows = {}
67 1
68 1
        flows_dict = [flow.as_dict() for flow in switch.flows]
69 1
        switch_flows[switch.dpid] = {'flows': flows_dict}
70 1
71 1
        dpid = switch.dpid
72
73
        flow_list = self.stored_flows[dpid]['flow_list']
74
75
        for stored_flow in flow_list:
76 1
            stored_flow_data = stored_flow['data']
77
            for installed_flow in flows_dict:
78 1
                if self._is_equal_flows(stored_flow_data, installed_flow):
79
                    log.warning("A problem with consistency were dectected.")
80 1
                    command = stored_flow['command']
81 1
                    self._install_flows(command, stored_flow_data, [switch])
82
83 1
    def _is_equal_flows(self, flow_1, flow_2):
84
        """Compare if two flows are equal."""
85 1
        if flow_1['priority'] == flow_2['priority']:
86
            if flow_1['match'] == flow_2['match']:
87 1
                return True
88 1
89
        return False
90 1
91 1
    # fazer o diff
92 1
    # @listen_to('kytos/topology.port.created')
93 1
    # def resend_stored_flows(self, event):
94 1
    #     """Resend stored Flows."""
95 1
    #     dpid = str(event.content['switch'])
96
    #     switch = self.controller.get_switch_by_dpid(dpid)
97 1
    #     # This can be a problem because this code is running a thread
98
    #     if dpid in self.resent_flows:
99 1
    #         log.info(f'Flow already resended to Switch {dpid}')
100
    #         return None
101
    #     if dpid in self.stored_flows:
102 1
    #         flow_list = self.stored_flows[dpid]['flow_list']
103
    #         for flow in flow_list:
104 1
    #             command = flow.get('command')
105
    #             flows_dict = flow.get('data')
106
    #             self._install_flows(command, flows_dict, [switch])
107
    #         self.resent_flows.add(dpid)
108
    #         log.info(f'Flows resent to Switch {dpid}')
109
    #     return None
110
111
    # pylint: disable=attribute-defined-outside-init
112 1
    def _load_flows(self):
113 1
        """Load stored flows."""
114 1
        try:
115 1
            data = self.storehouse.get_data()['flow_persistence']
116 1
            if 'id' in data:
117 1
                del data['id']
118
            self.stored_flows = data
119 1
120 1
        except KeyError as error:
121
            log.debug(f'There are no flows to load: {error}')
122
        else:
123 1
            log.info('Flows loaded.')
124 1
125
    @staticmethod
126 1
    def _generate_match_fields(flows):
127
        """Generate flow match fields."""
128 1
        match_fields = {}
129
        for fields in flows.get('flows', {}):
130 1
            if 'priority' in fields:
131
                match_fields['priority'] = fields['priority']
132 1
            if 'cookie' in fields:
133
                match_fields['cookie'] = fields['cookie']
134 1
            if 'match' in fields:
135 1
                for field, value in fields['match'].items():
136
                    match_fields[field] = value
137 1
        return match_fields
138
139
    def _store_changed_flows(self, command, flows, switches):
140 1
        """Store changed flows."""
141 1
        store_box_updated = self.stored_flows.copy()
142
        # if the flow has a destination dpid it can be stored.
143 1
        if not switches:
144
            log.info('The Flow cannot be stored, the destination Switches '
145 1
                     f'have not been specified: {switches}')
146 1
            return None
147 1
        for switch in switches:
148 1
            new_flow = {}
149 1
            flow_list = []
150 1
            new_flow['command'] = command
151
            # The fields to check if the flow is already stored.
152
            new_flow['match_fields'] = self._generate_match_fields(flows)
153 1
            new_flow['data'] = flows
154
155 1
            if switch.id not in store_box_updated:
156 1
                # Switch not stored, add to box.
157 1
                flow_list.append(new_flow)
158
                store_box_updated[switch.id] = {"flow_list": flow_list}
159 1
                continue
160
161
            stored_flows = store_box_updated[switch.id].get('flow_list', [])
162
163
            # Check if flow already stored
164
            for stored_flow in stored_flows:
165
166 1
                new_flow_match_fields = new_flow.get('match_fields')
167 1
                stored_flow_match_fields = stored_flow.get('match_fields')
168 1
169 1
                if new_flow_match_fields == stored_flow_match_fields:
170 1
171
                    new_flow_command = new_flow.get('command')
172
                    stored_flow_command = stored_flow.get('command')
173
174 1
                    if new_flow_command == stored_flow_command:
175
                        log.debug('Data already stored.')
176
                        return None
177
178
                    # Command conflict. Remove the old flow.
179
                    # Example: Instruction to add new flow but exist
180
                    # a stored instruction to remove this flow.
181
                    # Remove old, and save the new instruction.
182
                    stored_flow['command'] = new_flow.get('command')
183
                    stored_flows.remove(stored_flow)
184
                    break
185
186
            stored_flows.append(new_flow)
187
            store_box_updated[switch.id]['flow_list'] = stored_flows
188
189
        store_box_updated['id'] = 'flow_persistence'
190
        self.storehouse.save_flow(store_box_updated)
191
        del store_box_updated['id']
192
        self.stored_flows = store_box_updated.copy()
193
        return None
194
195
    @rest('v2/flows')
196
    @rest('v2/flows/<dpid>')
197
    def list(self, dpid=None):
198
        """Retrieve all flows from a switch identified by dpid.
199
200
        If no dpid is specified, return all flows from all switches.
201
        """
202
        if dpid is None:
203
            switches = self.controller.switches.values()
204
        else:
205
            switches = [self.controller.get_switch_by_dpid(dpid)]
206
207
        switch_flows = {}
208
209
        for switch in switches:
210
            flows_dict = [flow.as_dict() for flow in switch.flows]
211
            switch_flows[switch.dpid] = {'flows': flows_dict}
212
213
        return jsonify(switch_flows)
214
215
    @rest('v2/flows', methods=['POST'])
216
    @rest('v2/flows/<dpid>', methods=['POST'])
217
    def add(self, dpid=None):
218
        """Install new flows in the switch identified by dpid.
219
220
        If no dpid is specified, install flows in all switches.
221
        """
222
        return self._send_flow_mods_from_request(dpid, "add")
223
224
    @rest('v2/delete', methods=['POST'])
225
    @rest('v2/delete/<dpid>', methods=['POST'])
226
    @rest('v2/flows', methods=['DELETE'])
227
    @rest('v2/flows/<dpid>', methods=['DELETE'])
228
    def delete(self, dpid=None):
229
        """Delete existing flows in the switch identified by dpid.
230
231
        If no dpid is specified, delete flows from all switches.
232
        """
233
        return self._send_flow_mods_from_request(dpid, "delete")
234
235
    def _get_all_switches_enabled(self):
236
        """Get a list of all switches enabled."""
237
        switches = self.controller.switches.values()
238
        return [switch for switch in switches if switch.is_enabled()]
239
240
    def _send_flow_mods_from_request(self, dpid, command, flows_dict=None):
241
        """Install FlowsMods from request."""
242
        if flows_dict is None:
243
            flows_dict = request.get_json()
244
            if flows_dict is None:
245
                return jsonify({"response": 'flows dict is none.'}), 404
246
247
        if dpid:
248
            switch = self.controller.get_switch_by_dpid(dpid)
249
            if not switch:
250
                return jsonify({"response": 'dpid not found.'}), 404
251
            elif switch.is_enabled() is False:
252
                return jsonify({"response": 'switch is disabled.'}), 404
253
            else:
254
                self._install_flows(command, flows_dict, [switch])
255
        else:
256
            self._install_flows(command, flows_dict,
257
                                self._get_all_switches_enabled())
258
259
        return jsonify({"response": "FlowMod Messages Sent"})
260
261
    def _install_flows(self, command, flows_dict, switches=[]):
262
        """Execute all procedures to install flows in the switches.
263
264
        Args:
265
            command: Flow command to be installed
266
            flows_dict: Dictionary with flows to be installed in the switches.
267
            switches: A list of switches
268
        """
269
        for switch in switches:
270
            serializer = FlowFactory.get_class(switch)
271
            flows = flows_dict.get('flows', [])
272
            for flow_dict in flows:
273
                flow = serializer.from_dict(flow_dict, switch)
274
                if command == "delete":
275
                    flow_mod = flow.as_of_delete_flow_mod()
276
                elif command == "add":
277
                    flow_mod = flow.as_of_add_flow_mod()
278
                else:
279
                    raise InvalidCommandError
280
                self._send_flow_mod(flow.switch, flow_mod)
281
                self._add_flow_mod_sent(flow_mod.header.xid, flow, command)
282
283
                self._send_napp_event(switch, flow, command)
284
        self._store_changed_flows(command, flows_dict, switches)
285
286
    def _add_flow_mod_sent(self, xid, flow, command):
287
        """Add the flow mod to the list of flow mods sent."""
288
        if len(self._flow_mods_sent) >= self._flow_mods_sent_max_size:
289
            self._flow_mods_sent.popitem(last=False)
290
        self._flow_mods_sent[xid] = (flow, command)
291
292
    def _send_flow_mod(self, switch, flow_mod):
293
        event_name = 'kytos/flow_manager.messages.out.ofpt_flow_mod'
294
295
        content = {'destination': switch.connection,
296
                   'message': flow_mod}
297
298
        event = KytosEvent(name=event_name, content=content)
299
        self.controller.buffers.msg_out.put(event)
300
301
    def _send_napp_event(self, switch, flow, command, **kwargs):
302
        """Send an Event to other apps informing about a FlowMod."""
303
        if command == 'add':
304
            name = 'kytos/flow_manager.flow.added'
305
        elif command == 'delete':
306
            name = 'kytos/flow_manager.flow.removed'
307
        elif command == 'error':
308
            name = 'kytos/flow_manager.flow.error'
309
        else:
310
            raise InvalidCommandError
311
        content = {'datapath': switch,
312
                   'flow': flow}
313
        content.update(kwargs)
314
        event_app = KytosEvent(name, content)
315
        self.controller.buffers.app.put(event_app)
316
317
    @listen_to('.*.of_core.*.ofpt_error')
318
    def handle_errors(self, event):
319
        """Receive OpenFlow error and send a event.
320
321
        The event is sent only if the error is related to a request made
322
        by flow_manager.
323
        """
324
        xid = event.content["message"].header.xid.value
325
        error_type = event.content["message"].error_type
326
        error_code = event.content["message"].code
327
        try:
328
            flow, error_command = self._flow_mods_sent[xid]
329
        except KeyError:
330
            pass
331
        else:
332
            self._send_napp_event(flow.switch, flow, 'error',
333
                                  error_command=error_command,
334
                                  error_type=error_type, error_code=error_code)
335